Переглянути джерело

[Fix] Core: Fix address rotation bug

Previously, upstream.get_addr function returned the new address of the
upstream. Unfortunately, it was used for printing addresses. It caused
the following situation: let's imagine we have A1 and A2 where A1 was
initially selected. So the connection was performed to A1:

                           Current addr   Selected addr

   Connect+---------+      A2+------>A1   A1
                    |
+-+Print failure<---+      A1+------>A2   A2
|                        +----+
+->Mark failure+-------->+ A2 |
                         +----+

But the failure OP as well as log message told about `A2` where the real
problem happened with `A1`.

This commit adds distinguishing between getting the next and the current
address of the upstream resolving this issue.
tags/1.9.0
Vsevolod Stakhov 5 роки тому
джерело
коміт
0f32df6f44

+ 2
- 2
src/fuzzy_storage.c Переглянути файл

@@ -568,7 +568,7 @@ fuzzy_mirror_error_handler (struct rspamd_http_connection *conn, GError *err)
msg_info ("abnormally closing connection from backend: %s:%s, "
"error: %e",
bk_conn->mirror->name,
rspamd_inet_address_to_string (rspamd_upstream_addr (bk_conn->up)),
rspamd_inet_address_to_string (rspamd_upstream_addr_cur (bk_conn->up)),
err);

fuzzy_mirror_close_connection (bk_conn);
@@ -604,7 +604,7 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx,
}

conn->sock = rspamd_inet_address_connect (
rspamd_upstream_addr (conn->up),
rspamd_upstream_addr_next (conn->up),
SOCK_STREAM, TRUE);

if (conn->sock == -1) {

+ 1
- 1
src/libserver/dns.c Переглянути файл

@@ -256,7 +256,7 @@ rspamd_dns_server_init (struct upstream *up, guint idx, gpointer ud)
void *serv;
struct rdns_upstream_elt *elt;

addr = rspamd_upstream_addr (up);
addr = rspamd_upstream_addr_next (up);

if (r->cfg) {
serv = rdns_resolver_add_server (r->r, rspamd_inet_address_to_string (addr),

+ 4
- 4
src/libserver/fuzzy_backend_redis.c Переглянути файл

@@ -648,7 +648,7 @@ rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk,
0);

session->up = up;
addr = rspamd_upstream_addr (up);
addr = rspamd_upstream_addr_next (up);
g_assert (addr != NULL);
session->ctx = rspamd_redis_pool_connect (backend->pool,
backend->dbname, backend->password,
@@ -774,7 +774,7 @@ rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk,
0);

session->up = up;
addr = rspamd_upstream_addr (up);
addr = rspamd_upstream_addr_next (up);
g_assert (addr != NULL);
session->ctx = rspamd_redis_pool_connect (backend->pool,
backend->dbname, backend->password,
@@ -899,7 +899,7 @@ rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk,
0);

session->up = up;
addr = rspamd_upstream_addr (up);
addr = rspamd_upstream_addr_next (up);
g_assert (addr != NULL);
session->ctx = rspamd_redis_pool_connect (backend->pool,
backend->dbname, backend->password,
@@ -1459,7 +1459,7 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk,
0);

session->up = up;
addr = rspamd_upstream_addr (up);
addr = rspamd_upstream_addr_next (up);
g_assert (addr != NULL);
session->ctx = rspamd_redis_pool_connect (backend->pool,
backend->dbname, backend->password,

+ 3
- 3
src/libstat/backends/redis_backend.c Переглянути файл

@@ -975,7 +975,7 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
0);

g_assert (cbdata->selected != NULL);
addr = rspamd_upstream_addr (cbdata->selected);
addr = rspamd_upstream_addr_next (cbdata->selected);
g_assert (addr != NULL);

if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
@@ -1522,7 +1522,7 @@ rspamd_redis_runtime (struct rspamd_task *task,
rt->stcf = stcf;
rt->redis_object_expanded = object_expanded;

addr = rspamd_upstream_addr (up);
addr = rspamd_upstream_addr_next (up);
g_assert (addr != NULL);

if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
@@ -1693,7 +1693,7 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens,
}
}

addr = rspamd_upstream_addr (up);
addr = rspamd_upstream_addr_next (up);
g_assert (addr != NULL);

if (rspamd_inet_address_get_af (addr) == AF_UNIX) {

+ 1
- 1
src/libstat/learn_cache/redis_cache.c Переглянути файл

@@ -385,7 +385,7 @@ rspamd_stat_cache_redis_runtime (struct rspamd_task *task,
rt->task = task;
rt->ctx = ctx;

addr = rspamd_upstream_addr (up);
addr = rspamd_upstream_addr_next (up);
g_assert (addr != NULL);

if (rspamd_inet_address_get_af (addr) == AF_UNIX) {

+ 7
- 1
src/libutil/upstream.c Переглянути файл

@@ -630,7 +630,7 @@ rspamd_upstream_dtor (struct upstream *up)
}

rspamd_inet_addr_t*
rspamd_upstream_addr (struct upstream *up)
rspamd_upstream_addr_next (struct upstream *up)
{
guint idx, next_idx;
struct upstream_addr_elt *e1, *e2;
@@ -646,6 +646,12 @@ rspamd_upstream_addr (struct upstream *up)
return e2->addr;
}

rspamd_inet_addr_t*
rspamd_upstream_addr_cur (const struct upstream *up)
{
return g_ptr_array_index (up->addrs.addr, up->addrs.cur);
}

const gchar*
rspamd_upstream_name (struct upstream *up)
{

+ 8
- 1
src/libutil/upstream.h Переглянути файл

@@ -207,12 +207,19 @@ void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
GFreeFunc free_func,
gpointer ud);

/**
* Returns the next IP address of the upstream (internal rotation)
* @param up
* @return
*/
rspamd_inet_addr_t* rspamd_upstream_addr_next (struct upstream *up);

/**
* Returns the current IP address of the upstream
* @param up
* @return
*/
rspamd_inet_addr_t* rspamd_upstream_addr (struct upstream *up);
rspamd_inet_addr_t* rspamd_upstream_addr_cur (const struct upstream *up);

/**
* Add custom address for an upstream (ownership of addr is transferred to upstream)

+ 1
- 1
src/lua/lua_upstream.c Переглянути файл

@@ -110,7 +110,7 @@ lua_upstream_get_addr (lua_State *L)
struct upstream *up = lua_check_upstream (L);

if (up) {
rspamd_lua_ip_push (L, rspamd_upstream_addr (up));
rspamd_lua_ip_push (L, rspamd_upstream_addr_next (up));
}
else {
lua_pushnil (L);

+ 7
- 7
src/plugins/fuzzy_check.c Переглянути файл

@@ -2209,7 +2209,7 @@ fuzzy_check_io_callback (gint fd, short what, void *arg)
msg_err_task ("got error on IO with server %s(%s), on %s, %d, %s",
rspamd_upstream_name (session->server),
rspamd_inet_address_to_string_pretty (
rspamd_upstream_addr (session->server)),
rspamd_upstream_addr_cur (session->server)),
session->state == 1 ? "read" : "write",
errno,
strerror (errno));
@@ -2255,7 +2255,7 @@ fuzzy_check_timer_callback (gint fd, short what, void *arg)
msg_err_task ("got IO timeout with server %s(%s), after %d retransmits",
rspamd_upstream_name (session->server),
rspamd_inet_address_to_string_pretty (
rspamd_upstream_addr (session->server)),
rspamd_upstream_addr_cur (session->server)),
session->retransmits);
rspamd_upstream_fail (session->server, FALSE);
if (session->item) {
@@ -2464,7 +2464,7 @@ fuzzy_controller_io_callback (gint fd, short what, void *arg)
msg_err_task ("got error in IO with server %s(%s), %d, %s",
rspamd_upstream_name (session->server),
rspamd_inet_address_to_string_pretty (
rspamd_upstream_addr (session->server)),
rspamd_upstream_addr_cur (session->server)),
errno, strerror (errno));
rspamd_upstream_fail (session->server, FALSE);
}
@@ -2568,7 +2568,7 @@ fuzzy_controller_timer_callback (gint fd, short what, void *arg)
"after %d retransmits",
rspamd_upstream_name (session->server),
rspamd_inet_address_to_string_pretty (
rspamd_upstream_addr (session->server)),
rspamd_upstream_addr_cur (session->server)),
session->retransmits);

if (session->session) {
@@ -2725,7 +2725,7 @@ register_fuzzy_client_call (struct rspamd_task *task,
selected = rspamd_upstream_get (rule->servers, RSPAMD_UPSTREAM_ROUND_ROBIN,
NULL, 0);
if (selected) {
addr = rspamd_upstream_addr (selected);
addr = rspamd_upstream_addr_next (selected);
if ((sock = rspamd_inet_address_connect (addr, SOCK_DGRAM, TRUE)) == -1) {
msg_warn_task ("cannot connect to %s(%s), %d, %s",
rspamd_upstream_name (selected),
@@ -2853,7 +2853,7 @@ register_fuzzy_controller_call (struct rspamd_http_connection_entry *entry,
while ((selected = rspamd_upstream_get (rule->servers,
RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
/* Create UDP socket */
addr = rspamd_upstream_addr (selected);
addr = rspamd_upstream_addr_next (selected);

if ((sock = rspamd_inet_address_connect (addr,
SOCK_DGRAM, TRUE)) == -1) {
@@ -3216,7 +3216,7 @@ fuzzy_check_send_lua_learn (struct fuzzy_rule *rule,
while ((selected = rspamd_upstream_get (rule->servers,
RSPAMD_UPSTREAM_SEQUENTIAL, NULL, 0))) {
/* Create UDP socket */
addr = rspamd_upstream_addr (selected);
addr = rspamd_upstream_addr_next (selected);

if ((sock = rspamd_inet_address_connect (addr,
SOCK_DGRAM, TRUE)) == -1) {

+ 3
- 2
src/plugins/surbl.c Переглянути файл

@@ -1624,7 +1624,8 @@ surbl_redirector_error (struct rspamd_http_connection *conn,

task = param->task;
msg_err_surbl ("connection with http server %s terminated incorrectly: %e",
rspamd_inet_address_to_string (rspamd_upstream_addr (param->redirector)),
rspamd_inet_address_to_string (
rspamd_upstream_addr_cur (param->redirector)),
err);
rspamd_upstream_fail (param->redirector, FALSE);
rspamd_session_remove_event (param->task->s, free_redirector_session,
@@ -1715,7 +1716,7 @@ register_redirector_call (struct rspamd_url *url, struct rspamd_task *task,
RSPAMD_UPSTREAM_ROUND_ROBIN, url->host, url->hostlen);

if (selected) {
s = rspamd_inet_address_connect (rspamd_upstream_addr (selected),
s = rspamd_inet_address_connect (rspamd_upstream_addr_next (selected),
SOCK_STREAM, TRUE);
}


+ 15
- 10
src/rspamd_proxy.c Переглянути файл

@@ -1309,7 +1309,8 @@ proxy_backend_mirror_error_handler (struct rspamd_http_connection *conn, GError
msg_info_session ("abnormally closing connection from backend: %s:%s, "
"error: %e",
bk_conn->name,
rspamd_inet_address_to_string (rspamd_upstream_addr (bk_conn->up)),
rspamd_inet_address_to_string (
rspamd_upstream_addr_cur (bk_conn->up)),
err);

if (err) {
@@ -1337,7 +1338,8 @@ proxy_backend_mirror_finish_handler (struct rspamd_http_connection *conn,
bk_conn->parser_from_ref, msg->body_buf.begin, msg->body_buf.len)) {
msg_warn_session ("cannot parse results from the mirror backend %s:%s",
bk_conn->name,
rspamd_inet_address_to_string (rspamd_upstream_addr (bk_conn->up)));
rspamd_inet_address_to_string (
rspamd_upstream_addr_cur (bk_conn->up)));
bk_conn->err = "cannot parse ucl";
}

@@ -1387,7 +1389,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
}

bk_conn->backend_sock = rspamd_inet_address_connect (
rspamd_upstream_addr (bk_conn->up),
rspamd_upstream_addr_next (bk_conn->up),
SOCK_STREAM, TRUE);

if (bk_conn->backend_sock == -1) {
@@ -1432,7 +1434,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)

if (m->local ||
rspamd_inet_address_is_local (
rspamd_upstream_addr (bk_conn->up), FALSE)) {
rspamd_upstream_addr_cur (bk_conn->up), FALSE)) {

if (session->fname) {
rspamd_http_message_add_header (msg, "File", session->fname);
@@ -1509,7 +1511,8 @@ proxy_backend_master_error_handler (struct rspamd_http_connection *conn, GError
session = bk_conn->s;
msg_info_session ("abnormally closing connection from backend: %s, error: %e,"
" retries left: %d",
rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)),
rspamd_inet_address_to_string (
rspamd_upstream_addr_cur (session->master_conn->up)),
err,
session->ctx->max_retries - session->retries);
session->retries ++;
@@ -1531,7 +1534,7 @@ proxy_backend_master_error_handler (struct rspamd_http_connection *conn, GError
msg_info_session ("retry connection to: %s"
" retries left: %d",
rspamd_inet_address_to_string (
rspamd_upstream_addr (session->master_conn->up)),
rspamd_upstream_addr_cur (session->master_conn->up)),
session->ctx->max_retries - session->retries);
}
}
@@ -1821,14 +1824,15 @@ retry:
}

session->master_conn->backend_sock = rspamd_inet_address_connect (
rspamd_upstream_addr (session->master_conn->up),
rspamd_upstream_addr_next (session->master_conn->up),
SOCK_STREAM, TRUE);

if (session->master_conn->backend_sock == -1) {
msg_err_session ("cannot connect upstream: %s(%s)",
host ? hostbuf : "default",
rspamd_inet_address_to_string (rspamd_upstream_addr (
session->master_conn->up)));
rspamd_inet_address_to_string (
rspamd_upstream_addr_cur (
session->master_conn->up)));
rspamd_upstream_fail (session->master_conn->up, TRUE);
session->retries ++;
goto retry;
@@ -1872,7 +1876,8 @@ retry:

if (backend->local ||
rspamd_inet_address_is_local (
rspamd_upstream_addr (session->master_conn->up), FALSE)) {
rspamd_upstream_addr_cur (
session->master_conn->up), FALSE)) {

if (session->fname) {
rspamd_http_message_add_header (msg, "File", session->fname);

+ 10
- 10
test/rspamd_upstream_test.c Переглянути файл

@@ -87,27 +87,27 @@ rspamd_upstream_test_func (void)
rspamd_parse_inet_address (&paddr, "::1", 0);
g_assert (rspamd_upstream_add_addr (up, paddr));
/* Rewind to start */
addr = rspamd_upstream_addr (up);
addr = rspamd_upstream_addr (up);
addr = rspamd_upstream_addr_next (up);
addr = rspamd_upstream_addr_next (up);
/* cur should be zero here */
addr = rspamd_upstream_addr (up);
next_addr = rspamd_upstream_addr (up);
addr = rspamd_upstream_addr_next (up);
next_addr = rspamd_upstream_addr_next (up);
g_assert (rspamd_inet_address_get_af (addr) == AF_INET);
g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
next_addr = rspamd_upstream_addr (up);
next_addr = rspamd_upstream_addr_next (up);
g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET6);
next_addr = rspamd_upstream_addr (up);
next_addr = rspamd_upstream_addr_next (up);
g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
next_addr = rspamd_upstream_addr (up);
next_addr = rspamd_upstream_addr_next (up);
g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
next_addr = rspamd_upstream_addr (up);
next_addr = rspamd_upstream_addr_next (up);
g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET6);
/* Test errors with IPv6 */
rspamd_upstream_fail (up, TRUE);
/* Now we should have merely IPv4 addresses in rotation */
addr = rspamd_upstream_addr (up);
addr = rspamd_upstream_addr_next (up);
for (i = 0; i < 256; i++) {
next_addr = rspamd_upstream_addr (up);
next_addr = rspamd_upstream_addr_next (up);
g_assert (rspamd_inet_address_get_af (addr) == AF_INET);
g_assert (rspamd_inet_address_get_af (next_addr) == AF_INET);
g_assert (rspamd_inet_address_compare (addr, next_addr) != 0);

Завантаження…
Відмінити
Зберегти