瀏覽代碼

[Feature] Allow exception when choosing upstream

tags/2.2
Vsevolod Stakhov 4 年之前
父節點
當前提交
234923a5ff
共有 2 個檔案被更改,包括 64 行新增22 行删除
  1. 53
    22
      src/libutil/upstream.c
  2. 11
    0
      src/libutil/upstream.h

+ 53
- 22
src/libutil/upstream.c 查看文件

@@ -1310,18 +1310,30 @@ rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
}

static struct upstream*
rspamd_upstream_get_random (struct upstream_list *ups)
rspamd_upstream_get_random (struct upstream_list *ups,
struct upstream *except)
{
guint idx = ottery_rand_range (ups->alive->len - 1);
for (;;) {
guint idx = ottery_rand_range (ups->alive->len - 1);
struct upstream *up;

return g_ptr_array_index (ups->alive, idx);
up = g_ptr_array_index (ups->alive, idx);

if (except && up == except) {
continue;
}

return up;
}
}

static struct upstream*
rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur)
rspamd_upstream_get_round_robin (struct upstream_list *ups,
struct upstream *except,
gboolean use_cur)
{
guint max_weight = 0, min_checked = G_MAXUINT;
struct upstream *up, *selected = NULL, *min_checked_sel = NULL;
struct upstream *up = NULL, *selected = NULL, *min_checked_sel = NULL;
guint i;

/* Select upstream with the maximum cur_weight */
@@ -1329,6 +1341,11 @@ rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur)

for (i = 0; i < ups->alive->len; i ++) {
up = g_ptr_array_index (ups->alive, i);

if (except != NULL && up == except) {
continue;
}

if (use_cur) {
if (up->cur_weight > max_weight) {
selected = up;
@@ -1396,18 +1413,15 @@ rspamd_consistent_hash (guint64 key, guint32 nbuckets)
}

static struct upstream*
rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint keylen)
rspamd_upstream_get_hashed (struct upstream_list *ups,
struct upstream *except,
const guint8 *key, guint keylen)
{
guint64 k;
guint32 idx;
static const guint max_tries = 20;
struct upstream *up = NULL;

if (ups->alive->len == 1) {
/* Fast path */
return g_ptr_array_index (ups->alive, 0);
}

/* Generate 64 bits input key */
k = rspamd_cryptobox_fast_hash_specific (RSPAMD_CRYPTOBOX_XXHASH64,
key, keylen, ups->hash_seed);
@@ -1420,8 +1434,8 @@ rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint
idx = rspamd_consistent_hash (k, ups->ups->len);
up = g_ptr_array_index (ups->ups, idx);

if (up->active_idx < 0) {
/* Found inactive upstream */
if (up->active_idx < 0 || (except != NULL && up == except)) {
/* Found inactive or excluded upstream */
k = mum_hash_step (k, ups->hash_seed);
}
else {
@@ -1435,7 +1449,7 @@ rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint
}

/* We failed to find any active upstream */
up = rspamd_upstream_get_random (ups);
up = rspamd_upstream_get_random (ups, except);
msg_info ("failed to find hashed upstream for %s, fallback to random: %s",
ups->ups_line, up->name);

@@ -1444,8 +1458,10 @@ rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint

static struct upstream*
rspamd_upstream_get_common (struct upstream_list *ups,
enum rspamd_upstream_rotation default_type,
const guchar *key, gsize keylen, gboolean forced)
struct upstream* except,
enum rspamd_upstream_rotation default_type,
const guchar *key, gsize keylen,
gboolean forced)
{
enum rspamd_upstream_rotation type;
struct upstream *up = NULL;
@@ -1459,6 +1475,12 @@ rspamd_upstream_get_common (struct upstream_list *ups,
}
RSPAMD_UPSTREAM_UNLOCK (ups);

if (ups->alive->len == 1) {
/* Fast path */
up = g_ptr_array_index (ups->alive, 0);
goto end;
}

if (!forced) {
type = ups->rot_alg != RSPAMD_UPSTREAM_UNDEF ? ups->rot_alg : default_type;
}
@@ -1474,16 +1496,16 @@ rspamd_upstream_get_common (struct upstream_list *ups,
switch (type) {
default:
case RSPAMD_UPSTREAM_RANDOM:
up = rspamd_upstream_get_random (ups);
up = rspamd_upstream_get_random (ups, except);
break;
case RSPAMD_UPSTREAM_HASHED:
up = rspamd_upstream_get_hashed (ups, key, keylen);
up = rspamd_upstream_get_hashed (ups, except, key, keylen);
break;
case RSPAMD_UPSTREAM_ROUND_ROBIN:
up = rspamd_upstream_get_round_robin (ups, TRUE);
up = rspamd_upstream_get_round_robin (ups, except, TRUE);
break;
case RSPAMD_UPSTREAM_MASTER_SLAVE:
up = rspamd_upstream_get_round_robin (ups, FALSE);
up = rspamd_upstream_get_round_robin (ups, except, FALSE);
break;
case RSPAMD_UPSTREAM_SEQUENTIAL:
if (ups->cur_elt >= ups->alive->len) {
@@ -1495,6 +1517,7 @@ rspamd_upstream_get_common (struct upstream_list *ups,
break;
}

end:
if (up) {
up->checked ++;
}
@@ -1507,7 +1530,7 @@ rspamd_upstream_get (struct upstream_list *ups,
enum rspamd_upstream_rotation default_type,
const guchar *key, gsize keylen)
{
return rspamd_upstream_get_common (ups, default_type, key, keylen, FALSE);
return rspamd_upstream_get_common (ups, NULL, default_type, key, keylen, FALSE);
}

struct upstream*
@@ -1515,7 +1538,15 @@ rspamd_upstream_get_forced (struct upstream_list *ups,
enum rspamd_upstream_rotation forced_type,
const guchar *key, gsize keylen)
{
return rspamd_upstream_get_common (ups, forced_type, key, keylen, TRUE);
return rspamd_upstream_get_common (ups, NULL, forced_type, key, keylen, TRUE);
}

struct upstream *rspamd_upstream_get_except (struct upstream_list *ups,
struct upstream *except,
enum rspamd_upstream_rotation default_type,
const guchar *key, gsize keylen)
{
return rspamd_upstream_get_common (ups, except, default_type, key, keylen, FALSE);
}

void

+ 11
- 0
src/libutil/upstream.h 查看文件

@@ -283,6 +283,17 @@ struct upstream *rspamd_upstream_get_forced (struct upstream_list *ups,
enum rspamd_upstream_rotation forced_type,
const guchar *key, gsize keylen);

/**
* Get new upstream from the list excepting the upstream specified
* @param ups upstream list
* @param type type of rotation algorithm, for `RSPAMD_UPSTREAM_HASHED` it is required to specify `key` and `keylen` as arguments
* @return
*/
struct upstream *rspamd_upstream_get_except (struct upstream_list *ups,
struct upstream *except,
enum rspamd_upstream_rotation default_type,
const guchar *key, gsize keylen);

/**
* Re-resolve addresses for all upstreams registered
*/

Loading…
取消
儲存