You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

upstream.c 42KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "upstream.h"
  18. #include "ottery.h"
  19. #include "ref.h"
  20. #include "cfg_file.h"
  21. #include "rdns.h"
  22. #include "cryptobox.h"
  23. #include "utlist.h"
  24. #include "logger.h"
  25. #include "contrib/librdns/rdns.h"
  26. #include "contrib/mumhash/mum.h"
  27. #include <math.h>
  28. struct upstream_inet_addr_entry {
  29. rspamd_inet_addr_t *addr;
  30. guint priority;
  31. struct upstream_inet_addr_entry *next;
  32. };
  33. struct upstream_addr_elt {
  34. rspamd_inet_addr_t *addr;
  35. guint priority;
  36. guint errors;
  37. };
  38. struct upstream_list_watcher {
  39. rspamd_upstream_watch_func func;
  40. GFreeFunc dtor;
  41. gpointer ud;
  42. enum rspamd_upstreams_watch_event events_mask;
  43. struct upstream_list_watcher *next, *prev;
  44. };
  45. struct upstream {
  46. guint weight;
  47. guint cur_weight;
  48. guint errors;
  49. guint checked;
  50. guint dns_requests;
  51. gint active_idx;
  52. guint ttl;
  53. gchar *name;
  54. ev_timer ev;
  55. gdouble last_fail;
  56. gdouble last_resolve;
  57. gpointer ud;
  58. enum rspamd_upstream_flag flags;
  59. struct upstream_list *ls;
  60. GList *ctx_pos;
  61. struct upstream_ctx *ctx;
  62. struct {
  63. GPtrArray *addr; /* struct upstream_addr_elt */
  64. guint cur;
  65. } addrs;
  66. struct upstream_inet_addr_entry *new_addrs;
  67. gpointer data;
  68. gchar uid[8];
  69. ref_entry_t ref;
  70. #ifdef UPSTREAMS_THREAD_SAFE
  71. rspamd_mutex_t *lock;
  72. #endif
  73. };
  74. struct upstream_limits {
  75. gdouble revive_time;
  76. gdouble revive_jitter;
  77. gdouble error_time;
  78. gdouble dns_timeout;
  79. gdouble lazy_resolve_time;
  80. guint max_errors;
  81. guint dns_retransmits;
  82. };
  83. struct upstream_list {
  84. gchar *ups_line;
  85. struct upstream_ctx *ctx;
  86. GPtrArray *ups;
  87. GPtrArray *alive;
  88. struct upstream_list_watcher *watchers;
  89. guint64 hash_seed;
  90. const struct upstream_limits *limits;
  91. enum rspamd_upstream_flag flags;
  92. guint cur_elt;
  93. enum rspamd_upstream_rotation rot_alg;
  94. #ifdef UPSTREAMS_THREAD_SAFE
  95. rspamd_mutex_t *lock;
  96. #endif
  97. };
  98. struct upstream_ctx {
  99. struct rdns_resolver *res;
  100. struct ev_loop *event_loop;
  101. struct upstream_limits limits;
  102. GQueue *upstreams;
  103. gboolean configured;
  104. rspamd_mempool_t *pool;
  105. ref_entry_t ref;
  106. };
  107. #ifndef UPSTREAMS_THREAD_SAFE
  108. #define RSPAMD_UPSTREAM_LOCK(x) do { } while (0)
  109. #define RSPAMD_UPSTREAM_UNLOCK(x) do { } while (0)
  110. #else
  111. #define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x->lock)
  112. #define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x->lock)
  113. #endif
  114. #define msg_debug_upstream(...) rspamd_conditional_debug_fast (NULL, NULL, \
  115. rspamd_upstream_log_id, "upstream", upstream->uid, \
  116. G_STRFUNC, \
  117. __VA_ARGS__)
  118. #define msg_info_upstream(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
  119. "upstream", upstream->uid, \
  120. G_STRFUNC, \
  121. __VA_ARGS__)
  122. INIT_LOG_MODULE(upstream)
  123. /* 4 errors in 10 seconds */
  124. #define DEFAULT_MAX_ERRORS 4
  125. static const guint default_max_errors = DEFAULT_MAX_ERRORS;
  126. #define DEFAULT_REVIVE_TIME 60
  127. static const gdouble default_revive_time = DEFAULT_REVIVE_TIME;
  128. #define DEFAULT_REVIVE_JITTER 0.4
  129. static const gdouble default_revive_jitter = DEFAULT_REVIVE_JITTER;
  130. #define DEFAULT_ERROR_TIME 10
  131. static const gdouble default_error_time = DEFAULT_ERROR_TIME;
  132. #define DEFAULT_DNS_TIMEOUT 1.0
  133. static const gdouble default_dns_timeout = DEFAULT_DNS_TIMEOUT;
  134. #define DEFAULT_DNS_RETRANSMITS 2
  135. static const guint default_dns_retransmits = DEFAULT_DNS_RETRANSMITS;
  136. /* TODO: make it configurable */
  137. #define DEFAULT_LAZY_RESOLVE_TIME 3600.0
  138. static const gdouble default_lazy_resolve_time = DEFAULT_LAZY_RESOLVE_TIME;
  139. static const struct upstream_limits default_limits = {
  140. .revive_time = DEFAULT_REVIVE_TIME,
  141. .revive_jitter = DEFAULT_REVIVE_JITTER,
  142. .error_time = DEFAULT_ERROR_TIME,
  143. .dns_timeout = DEFAULT_DNS_TIMEOUT,
  144. .dns_retransmits = DEFAULT_DNS_RETRANSMITS,
  145. .max_errors = DEFAULT_MAX_ERRORS,
  146. .lazy_resolve_time = DEFAULT_LAZY_RESOLVE_TIME,
  147. };
  148. static void rspamd_upstream_lazy_resolve_cb (struct ev_loop *, ev_timer *, int );
  149. void
  150. rspamd_upstreams_library_config (struct rspamd_config *cfg,
  151. struct upstream_ctx *ctx,
  152. struct ev_loop *event_loop,
  153. struct rdns_resolver *resolver)
  154. {
  155. g_assert (ctx != NULL);
  156. g_assert (cfg != NULL);
  157. if (cfg->upstream_error_time) {
  158. ctx->limits.error_time = cfg->upstream_error_time;
  159. }
  160. if (cfg->upstream_max_errors) {
  161. ctx->limits.max_errors = cfg->upstream_max_errors;
  162. }
  163. if (cfg->upstream_revive_time) {
  164. ctx->limits.revive_time = cfg->upstream_max_errors;
  165. }
  166. if (cfg->upstream_lazy_resolve_time) {
  167. ctx->limits.lazy_resolve_time = cfg->upstream_lazy_resolve_time;
  168. }
  169. if (cfg->dns_retransmits) {
  170. ctx->limits.dns_retransmits = cfg->dns_retransmits;
  171. }
  172. if (cfg->dns_timeout) {
  173. ctx->limits.dns_timeout = cfg->dns_timeout;
  174. }
  175. ctx->event_loop = event_loop;
  176. ctx->res = resolver;
  177. ctx->configured = TRUE;
  178. /* Start lazy resolving */
  179. if (event_loop && resolver) {
  180. GList *cur;
  181. struct upstream *upstream;
  182. cur = ctx->upstreams->head;
  183. while (cur) {
  184. upstream = cur->data;
  185. if (!ev_can_stop (&upstream->ev) && upstream->ls &&
  186. !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
  187. gdouble when;
  188. if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
  189. /* Resolve them immediately ! */
  190. when = 0.0;
  191. }
  192. else {
  193. when = rspamd_time_jitter (upstream->ls->limits->lazy_resolve_time,
  194. upstream->ls->limits->lazy_resolve_time * .1);
  195. }
  196. ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb,
  197. when, 0);
  198. upstream->ev.data = upstream;
  199. ev_timer_start (ctx->event_loop, &upstream->ev);
  200. }
  201. cur = g_list_next (cur);
  202. }
  203. }
  204. }
  205. static void
  206. rspamd_upstream_ctx_dtor (struct upstream_ctx *ctx)
  207. {
  208. GList *cur;
  209. struct upstream *u;
  210. cur = ctx->upstreams->head;
  211. while (cur) {
  212. u = cur->data;
  213. u->ctx = NULL;
  214. u->ctx_pos = NULL;
  215. cur = g_list_next (cur);
  216. }
  217. g_queue_free (ctx->upstreams);
  218. rspamd_mempool_delete (ctx->pool);
  219. g_free (ctx);
  220. }
  221. void
  222. rspamd_upstreams_library_unref (struct upstream_ctx *ctx)
  223. {
  224. REF_RELEASE (ctx);
  225. }
  226. struct upstream_ctx *
  227. rspamd_upstreams_library_init (void)
  228. {
  229. struct upstream_ctx *ctx;
  230. ctx = g_malloc0 (sizeof (*ctx));
  231. memcpy (&ctx->limits, &default_limits, sizeof (ctx->limits));
  232. ctx->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
  233. "upstreams", 0);
  234. ctx->upstreams = g_queue_new ();
  235. REF_INIT_RETAIN (ctx, rspamd_upstream_ctx_dtor);
  236. return ctx;
  237. }
  238. static gint
  239. rspamd_upstream_af_to_weight (const rspamd_inet_addr_t *addr)
  240. {
  241. int ret;
  242. switch (rspamd_inet_address_get_af (addr)) {
  243. case AF_UNIX:
  244. ret = 2;
  245. break;
  246. case AF_INET:
  247. ret = 1;
  248. break;
  249. default:
  250. ret = 0;
  251. break;
  252. }
  253. return ret;
  254. }
  255. /*
  256. * Select IPv4 addresses before IPv6
  257. */
  258. static gint
  259. rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b)
  260. {
  261. const struct upstream_addr_elt *ip1 = *(const struct upstream_addr_elt **)a,
  262. *ip2 = *(const struct upstream_addr_elt **)b;
  263. gint w1, w2;
  264. if (ip1->priority == 0 && ip2->priority == 0) {
  265. w1 = rspamd_upstream_af_to_weight (ip1->addr);
  266. w2 = rspamd_upstream_af_to_weight (ip2->addr);
  267. }
  268. else {
  269. w1 = ip1->priority;
  270. w2 = ip2->priority;
  271. }
  272. /* Inverse order */
  273. return w2 - w1;
  274. }
  275. static void
  276. rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *upstream)
  277. {
  278. RSPAMD_UPSTREAM_LOCK (ls);
  279. g_ptr_array_add (ls->alive, upstream);
  280. upstream->active_idx = ls->alive->len - 1;
  281. if (upstream->ctx && upstream->ctx->configured &&
  282. !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
  283. if (ev_can_stop (&upstream->ev)) {
  284. ev_timer_stop (upstream->ctx->event_loop, &upstream->ev);
  285. }
  286. /* Start lazy (or not so lazy) names resolution */
  287. gdouble when;
  288. if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
  289. /* Resolve them immediately ! */
  290. when = 0.0;
  291. }
  292. else {
  293. when = rspamd_time_jitter (upstream->ls->limits->lazy_resolve_time,
  294. upstream->ls->limits->lazy_resolve_time * .1);
  295. }
  296. ev_timer_init (&upstream->ev, rspamd_upstream_lazy_resolve_cb,
  297. when, 0);
  298. upstream->ev.data = upstream;
  299. msg_debug_upstream ("start lazy resolving for %s in %.0f seconds",
  300. upstream->name, when);
  301. ev_timer_start (upstream->ctx->event_loop, &upstream->ev);
  302. }
  303. RSPAMD_UPSTREAM_UNLOCK (ls);
  304. }
  305. static void
  306. rspamd_upstream_addr_elt_dtor (gpointer a)
  307. {
  308. struct upstream_addr_elt *elt = a;
  309. if (elt) {
  310. rspamd_inet_address_free (elt->addr);
  311. g_free (elt);
  312. }
  313. }
  314. static void
  315. rspamd_upstream_update_addrs (struct upstream *upstream)
  316. {
  317. guint addr_cnt, i, port;
  318. gboolean seen_addr, reset_errors = FALSE;
  319. struct upstream_inet_addr_entry *cur, *tmp;
  320. GPtrArray *new_addrs;
  321. struct upstream_addr_elt *addr_elt, *naddr;
  322. /*
  323. * We need first of all get the saved port, since DNS gives us no
  324. * idea about what port has been used previously
  325. */
  326. RSPAMD_UPSTREAM_LOCK (upstream);
  327. if (upstream->addrs.addr->len > 0 && upstream->new_addrs) {
  328. addr_elt = g_ptr_array_index (upstream->addrs.addr, 0);
  329. port = rspamd_inet_address_get_port (addr_elt->addr);
  330. /* Now calculate new addrs count */
  331. addr_cnt = 0;
  332. LL_FOREACH (upstream->new_addrs, cur) {
  333. addr_cnt++;
  334. }
  335. /* At 10% probability reset errors on addr elements */
  336. if (rspamd_random_double_fast () > 0.9) {
  337. reset_errors = TRUE;
  338. msg_debug_upstream ("reset errors on upstream %s",
  339. upstream->name);
  340. }
  341. new_addrs = g_ptr_array_new_full (addr_cnt, rspamd_upstream_addr_elt_dtor);
  342. /* Copy addrs back */
  343. LL_FOREACH (upstream->new_addrs, cur) {
  344. seen_addr = FALSE;
  345. naddr = NULL;
  346. /* Ports are problematic, set to compare in the next block */
  347. rspamd_inet_address_set_port (cur->addr, port);
  348. PTR_ARRAY_FOREACH (upstream->addrs.addr, i, addr_elt) {
  349. if (rspamd_inet_address_compare (addr_elt->addr, cur->addr, FALSE) == 0) {
  350. naddr = g_malloc0 (sizeof (*naddr));
  351. naddr->addr = cur->addr;
  352. naddr->errors = reset_errors ? 0 : addr_elt->errors;
  353. seen_addr = TRUE;
  354. break;
  355. }
  356. }
  357. if (!seen_addr) {
  358. naddr = g_malloc0 (sizeof (*naddr));
  359. naddr->addr = cur->addr;
  360. naddr->errors = 0;
  361. msg_debug_upstream ("new address for %s: %s",
  362. upstream->name,
  363. rspamd_inet_address_to_string_pretty (naddr->addr));
  364. }
  365. else {
  366. msg_debug_upstream ("existing address for %s: %s",
  367. upstream->name,
  368. rspamd_inet_address_to_string_pretty (cur->addr));
  369. }
  370. g_ptr_array_add (new_addrs, naddr);
  371. }
  372. /* Free old addresses */
  373. g_ptr_array_free (upstream->addrs.addr, TRUE);
  374. upstream->addrs.cur = 0;
  375. upstream->addrs.addr = new_addrs;
  376. g_ptr_array_sort (upstream->addrs.addr, rspamd_upstream_addr_sort_func);
  377. }
  378. LL_FOREACH_SAFE (upstream->new_addrs, cur, tmp) {
  379. /* Do not free inet address pointer since it has been transferred to up */
  380. g_free (cur);
  381. }
  382. upstream->new_addrs = NULL;
  383. RSPAMD_UPSTREAM_UNLOCK (upstream);
  384. }
  385. static void
  386. rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
  387. {
  388. struct upstream *up = (struct upstream *)arg;
  389. struct rdns_reply_entry *entry;
  390. struct upstream_inet_addr_entry *up_ent;
  391. if (reply->code == RDNS_RC_NOERROR) {
  392. entry = reply->entries;
  393. RSPAMD_UPSTREAM_LOCK (up);
  394. while (entry) {
  395. if (entry->type == RDNS_REQUEST_A) {
  396. up_ent = g_malloc0 (sizeof (*up_ent));
  397. up_ent->addr = rspamd_inet_address_new (AF_INET,
  398. &entry->content.a.addr);
  399. LL_PREPEND (up->new_addrs, up_ent);
  400. }
  401. else if (entry->type == RDNS_REQUEST_AAAA) {
  402. up_ent = g_malloc0 (sizeof (*up_ent));
  403. up_ent->addr = rspamd_inet_address_new (AF_INET6,
  404. &entry->content.aaa.addr);
  405. LL_PREPEND (up->new_addrs, up_ent);
  406. }
  407. entry = entry->next;
  408. }
  409. RSPAMD_UPSTREAM_UNLOCK (up);
  410. }
  411. up->dns_requests--;
  412. if (up->dns_requests == 0) {
  413. rspamd_upstream_update_addrs (up);
  414. }
  415. REF_RELEASE (up);
  416. }
  417. struct rspamd_upstream_srv_dns_cb {
  418. struct upstream *up;
  419. guint priority;
  420. guint port;
  421. guint requests_inflight;
  422. };
  423. /* Used when we have resolved SRV record and resolved addrs */
  424. static void
  425. rspamd_upstream_dns_srv_phase2_cb (struct rdns_reply *reply, void *arg)
  426. {
  427. struct rspamd_upstream_srv_dns_cb *cbdata =
  428. (struct rspamd_upstream_srv_dns_cb *)arg;
  429. struct upstream *up;
  430. struct rdns_reply_entry *entry;
  431. struct upstream_inet_addr_entry *up_ent;
  432. up = cbdata->up;
  433. if (reply->code == RDNS_RC_NOERROR) {
  434. entry = reply->entries;
  435. RSPAMD_UPSTREAM_LOCK (up);
  436. while (entry) {
  437. if (entry->type == RDNS_REQUEST_A) {
  438. up_ent = g_malloc0 (sizeof (*up_ent));
  439. up_ent->addr = rspamd_inet_address_new (AF_INET,
  440. &entry->content.a.addr);
  441. up_ent->priority = cbdata->priority;
  442. rspamd_inet_address_set_port (up_ent->addr, cbdata->port);
  443. LL_PREPEND (up->new_addrs, up_ent);
  444. }
  445. else if (entry->type == RDNS_REQUEST_AAAA) {
  446. up_ent = g_malloc0 (sizeof (*up_ent));
  447. up_ent->addr = rspamd_inet_address_new (AF_INET6,
  448. &entry->content.aaa.addr);
  449. up_ent->priority = cbdata->priority;
  450. rspamd_inet_address_set_port (up_ent->addr, cbdata->port);
  451. LL_PREPEND (up->new_addrs, up_ent);
  452. }
  453. entry = entry->next;
  454. }
  455. RSPAMD_UPSTREAM_UNLOCK (up);
  456. }
  457. up->dns_requests--;
  458. cbdata->requests_inflight --;
  459. if (cbdata->requests_inflight == 0) {
  460. g_free (cbdata);
  461. }
  462. if (up->dns_requests == 0) {
  463. rspamd_upstream_update_addrs (up);
  464. }
  465. REF_RELEASE (up);
  466. }
  467. static void
  468. rspamd_upstream_dns_srv_cb (struct rdns_reply *reply, void *arg)
  469. {
  470. struct upstream *upstream = (struct upstream *) arg;
  471. struct rdns_reply_entry *entry;
  472. struct rspamd_upstream_srv_dns_cb *ncbdata;
  473. if (reply->code == RDNS_RC_NOERROR) {
  474. entry = reply->entries;
  475. RSPAMD_UPSTREAM_LOCK (upstream);
  476. while (entry) {
  477. /* XXX: we ignore weight as it contradicts with upstreams logic */
  478. if (entry->type == RDNS_REQUEST_SRV) {
  479. msg_debug_upstream ("got srv reply for %s: %s "
  480. "(weight=%d, priority=%d, port=%d)",
  481. upstream->name, entry->content.srv.target,
  482. entry->content.srv.weight, entry->content.srv.priority,
  483. entry->content.srv.port);
  484. ncbdata = g_malloc0 (sizeof (*ncbdata));
  485. ncbdata->priority = entry->content.srv.weight;
  486. ncbdata->port = entry->content.srv.port;
  487. /* XXX: for all entries? */
  488. upstream->ttl = entry->ttl;
  489. if (rdns_make_request_full (upstream->ctx->res,
  490. rspamd_upstream_dns_srv_phase2_cb, ncbdata,
  491. upstream->ls->limits->dns_timeout,
  492. upstream->ls->limits->dns_retransmits,
  493. 1, entry->content.srv.target, RDNS_REQUEST_A) != NULL) {
  494. upstream->dns_requests++;
  495. REF_RETAIN (upstream);
  496. ncbdata->requests_inflight ++;
  497. }
  498. if (rdns_make_request_full (upstream->ctx->res,
  499. rspamd_upstream_dns_srv_phase2_cb, ncbdata,
  500. upstream->ls->limits->dns_timeout,
  501. upstream->ls->limits->dns_retransmits,
  502. 1, entry->content.srv.target, RDNS_REQUEST_AAAA) != NULL) {
  503. upstream->dns_requests++;
  504. REF_RETAIN (upstream);
  505. ncbdata->requests_inflight ++;
  506. }
  507. if (ncbdata->requests_inflight == 0) {
  508. g_free (ncbdata);
  509. }
  510. }
  511. entry = entry->next;
  512. }
  513. RSPAMD_UPSTREAM_UNLOCK (upstream);
  514. }
  515. upstream->dns_requests--;
  516. REF_RELEASE (upstream);
  517. }
  518. static void
  519. rspamd_upstream_revive_cb (struct ev_loop *loop, ev_timer *w, int revents)
  520. {
  521. struct upstream *upstream = (struct upstream *)w->data;
  522. RSPAMD_UPSTREAM_LOCK (upstream);
  523. ev_timer_stop (loop, w);
  524. msg_debug_upstream ("revive upstream %s", upstream->name);
  525. if (upstream->ls) {
  526. rspamd_upstream_set_active (upstream->ls, upstream);
  527. }
  528. RSPAMD_UPSTREAM_UNLOCK (upstream);
  529. g_assert (upstream->ref.refcount > 1);
  530. REF_RELEASE (upstream);
  531. }
  532. static void
  533. rspamd_upstream_resolve_addrs (const struct upstream_list *ls,
  534. struct upstream *upstream)
  535. {
  536. /* XXX: maybe make it configurable */
  537. static const gdouble min_resolve_interval = 60.0;
  538. if (upstream->ctx->res != NULL &&
  539. upstream->ctx->configured &&
  540. upstream->dns_requests == 0 &&
  541. !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
  542. gdouble now = ev_now (upstream->ctx->event_loop);
  543. if (now - upstream->last_resolve < min_resolve_interval) {
  544. msg_info_upstream ("do not resolve upstream %s as it was checked %.0f "
  545. "seconds ago (%.0f is minimum)",
  546. upstream->name, now - upstream->last_resolve,
  547. min_resolve_interval);
  548. return;
  549. }
  550. /* Resolve name of the upstream one more time */
  551. if (upstream->name[0] != '/') {
  552. upstream->last_resolve = now;
  553. if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
  554. if (rdns_make_request_full (upstream->ctx->res,
  555. rspamd_upstream_dns_srv_cb, upstream,
  556. ls->limits->dns_timeout, ls->limits->dns_retransmits,
  557. 1, upstream->name, RDNS_REQUEST_SRV) != NULL) {
  558. upstream->dns_requests++;
  559. REF_RETAIN (upstream);
  560. }
  561. }
  562. else {
  563. if (rdns_make_request_full (upstream->ctx->res,
  564. rspamd_upstream_dns_cb, upstream,
  565. ls->limits->dns_timeout, ls->limits->dns_retransmits,
  566. 1, upstream->name, RDNS_REQUEST_A) != NULL) {
  567. upstream->dns_requests++;
  568. REF_RETAIN (upstream);
  569. }
  570. if (rdns_make_request_full (upstream->ctx->res,
  571. rspamd_upstream_dns_cb, upstream,
  572. ls->limits->dns_timeout, ls->limits->dns_retransmits,
  573. 1, upstream->name, RDNS_REQUEST_AAAA) != NULL) {
  574. upstream->dns_requests++;
  575. REF_RETAIN (upstream);
  576. }
  577. }
  578. }
  579. }
  580. else if (upstream->dns_requests != 0) {
  581. msg_info_upstream ("do not resolve upstream %s as another request for "
  582. "resolving has been already issued",
  583. upstream->name);
  584. }
  585. }
  586. static void
  587. rspamd_upstream_lazy_resolve_cb (struct ev_loop *loop, ev_timer *w, int revents)
  588. {
  589. struct upstream *up = (struct upstream *)w->data;
  590. RSPAMD_UPSTREAM_LOCK (up);
  591. ev_timer_stop (loop, w);
  592. if (up->ls) {
  593. rspamd_upstream_resolve_addrs (up->ls, up);
  594. if (up->ttl == 0 || up->ttl > up->ls->limits->lazy_resolve_time) {
  595. w->repeat = rspamd_time_jitter (up->ls->limits->lazy_resolve_time,
  596. up->ls->limits->lazy_resolve_time * .1);
  597. }
  598. else {
  599. w->repeat = up->ttl;
  600. }
  601. ev_timer_again (loop, w);
  602. }
  603. RSPAMD_UPSTREAM_UNLOCK (up);
  604. }
  605. static void
  606. rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *upstream)
  607. {
  608. gdouble ntim;
  609. guint i;
  610. struct upstream *cur;
  611. struct upstream_list_watcher *w;
  612. RSPAMD_UPSTREAM_LOCK (ls);
  613. g_ptr_array_remove_index (ls->alive, upstream->active_idx);
  614. upstream->active_idx = -1;
  615. /* We need to update all indicies */
  616. for (i = 0; i < ls->alive->len; i ++) {
  617. cur = g_ptr_array_index (ls->alive, i);
  618. cur->active_idx = i;
  619. }
  620. if (upstream->ctx) {
  621. rspamd_upstream_resolve_addrs (ls, upstream);
  622. REF_RETAIN (upstream);
  623. ntim = rspamd_time_jitter (ls->limits->revive_time,
  624. ls->limits->revive_time * ls->limits->revive_jitter);
  625. if (ev_can_stop (&upstream->ev)) {
  626. ev_timer_stop (upstream->ctx->event_loop, &upstream->ev);
  627. }
  628. msg_debug_upstream ("mark upstream %s inactive; revive in %.0f seconds",
  629. upstream->name, ntim);
  630. ev_timer_init (&upstream->ev, rspamd_upstream_revive_cb, ntim, 0);
  631. upstream->ev.data = upstream;
  632. if (upstream->ctx->event_loop != NULL && upstream->ctx->configured) {
  633. ev_timer_start (upstream->ctx->event_loop, &upstream->ev);
  634. }
  635. }
  636. DL_FOREACH (upstream->ls->watchers, w) {
  637. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) {
  638. w->func (upstream, RSPAMD_UPSTREAM_WATCH_OFFLINE, upstream->errors, w->ud);
  639. }
  640. }
  641. RSPAMD_UPSTREAM_UNLOCK (ls);
  642. }
  643. void
  644. rspamd_upstream_fail (struct upstream *upstream,
  645. gboolean addr_failure,
  646. const gchar *reason)
  647. {
  648. gdouble error_rate = 0, max_error_rate = 0;
  649. gdouble sec_last, sec_cur;
  650. struct upstream_addr_elt *addr_elt;
  651. struct upstream_list_watcher *w;
  652. msg_debug_upstream ("upstream %s failed; reason: %s",
  653. upstream->name,
  654. reason);
  655. if (upstream->ctx && upstream->active_idx != -1 && upstream->ls) {
  656. sec_cur = rspamd_get_ticks (FALSE);
  657. RSPAMD_UPSTREAM_LOCK (upstream);
  658. if (upstream->errors == 0) {
  659. /* We have the first error */
  660. upstream->last_fail = sec_cur;
  661. upstream->errors = 1;
  662. if (upstream->ls && upstream->dns_requests == 0) {
  663. /* Try to re-resolve address immediately */
  664. rspamd_upstream_resolve_addrs (upstream->ls, upstream);
  665. }
  666. DL_FOREACH (upstream->ls->watchers, w) {
  667. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
  668. w->func (upstream, RSPAMD_UPSTREAM_WATCH_FAILURE, 1, w->ud);
  669. }
  670. }
  671. }
  672. else {
  673. sec_last = upstream->last_fail;
  674. if (sec_cur >= sec_last) {
  675. upstream->errors ++;
  676. DL_FOREACH (upstream->ls->watchers, w) {
  677. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
  678. w->func (upstream, RSPAMD_UPSTREAM_WATCH_FAILURE,
  679. upstream->errors, w->ud);
  680. }
  681. }
  682. if (sec_cur - sec_last >= upstream->ls->limits->error_time) {
  683. error_rate = ((gdouble)upstream->errors) / (sec_cur - sec_last);
  684. max_error_rate = ((gdouble)upstream->ls->limits->max_errors) /
  685. upstream->ls->limits->error_time;
  686. }
  687. if (error_rate > max_error_rate) {
  688. /* Remove upstream from the active list */
  689. if (upstream->ls->ups->len > 1) {
  690. msg_debug_upstream ("mark upstream %s inactive; "
  691. "reason: %s; %.2f "
  692. "error rate (%d errors), "
  693. "%.2f max error rate, "
  694. "%.1f first error time, "
  695. "%.1f current ts, "
  696. "%d upstreams left",
  697. upstream->name,
  698. reason,
  699. error_rate,
  700. upstream->errors,
  701. max_error_rate,
  702. sec_last,
  703. sec_cur,
  704. upstream->ls->alive->len - 1);
  705. rspamd_upstream_set_inactive (upstream->ls, upstream);
  706. upstream->errors = 0;
  707. }
  708. else {
  709. msg_debug_upstream ("cannot mark last alive upstream %s "
  710. "inactive; reason: %s; %.2f "
  711. "error rate (%d errors), "
  712. "%.2f max error rate, "
  713. "%.1f first error time, "
  714. "%.1f current ts",
  715. upstream->name,
  716. reason,
  717. error_rate,
  718. upstream->errors,
  719. max_error_rate,
  720. sec_last,
  721. sec_cur);
  722. /* Just re-resolve addresses */
  723. if (sec_cur - sec_last > upstream->ls->limits->revive_time) {
  724. upstream->errors = 0;
  725. rspamd_upstream_resolve_addrs (upstream->ls, upstream);
  726. }
  727. }
  728. }
  729. else if (sec_cur - sec_last >= upstream->ls->limits->error_time) {
  730. /* Forget the whole interval */
  731. upstream->last_fail = sec_cur;
  732. upstream->errors = 1;
  733. }
  734. }
  735. }
  736. if (addr_failure) {
  737. /* Also increase count of errors for this specific address */
  738. if (upstream->addrs.addr) {
  739. addr_elt = g_ptr_array_index (upstream->addrs.addr,
  740. upstream->addrs.cur);
  741. addr_elt->errors++;
  742. }
  743. }
  744. RSPAMD_UPSTREAM_UNLOCK (upstream);
  745. }
  746. }
  747. void
  748. rspamd_upstream_ok (struct upstream *upstream)
  749. {
  750. struct upstream_addr_elt *addr_elt;
  751. struct upstream_list_watcher *w;
  752. RSPAMD_UPSTREAM_LOCK (upstream);
  753. if (upstream->errors > 0 && upstream->active_idx != -1 && upstream->ls) {
  754. /* We touch upstream if and only if it is active */
  755. msg_debug_upstream ("reset errors on upstream %s (was %ud)", upstream->name, upstream->errors);
  756. upstream->errors = 0;
  757. if (upstream->addrs.addr) {
  758. addr_elt = g_ptr_array_index (upstream->addrs.addr, upstream->addrs.cur);
  759. addr_elt->errors = 0;
  760. }
  761. DL_FOREACH (upstream->ls->watchers, w) {
  762. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_SUCCESS) {
  763. w->func (upstream, RSPAMD_UPSTREAM_WATCH_SUCCESS, 0, w->ud);
  764. }
  765. }
  766. }
  767. RSPAMD_UPSTREAM_UNLOCK (upstream);
  768. }
  769. void
  770. rspamd_upstream_set_weight (struct upstream *up, guint weight)
  771. {
  772. RSPAMD_UPSTREAM_LOCK (up);
  773. up->weight = weight;
  774. RSPAMD_UPSTREAM_UNLOCK (up);
  775. }
  776. #define SEED_CONSTANT 0xa574de7df64e9b9dULL
  777. struct upstream_list*
  778. rspamd_upstreams_create (struct upstream_ctx *ctx)
  779. {
  780. struct upstream_list *ls;
  781. ls = g_malloc0 (sizeof (*ls));
  782. ls->hash_seed = SEED_CONSTANT;
  783. ls->ups = g_ptr_array_new ();
  784. ls->alive = g_ptr_array_new ();
  785. #ifdef UPSTREAMS_THREAD_SAFE
  786. ls->lock = rspamd_mutex_new ();
  787. #endif
  788. ls->cur_elt = 0;
  789. ls->ctx = ctx;
  790. ls->rot_alg = RSPAMD_UPSTREAM_UNDEF;
  791. if (ctx) {
  792. ls->limits = &ctx->limits;
  793. }
  794. else {
  795. ls->limits = &default_limits;
  796. }
  797. return ls;
  798. }
  799. gsize
  800. rspamd_upstreams_count (struct upstream_list *ups)
  801. {
  802. return ups != NULL ? ups->ups->len : 0;
  803. }
  804. gsize
  805. rspamd_upstreams_alive (struct upstream_list *ups)
  806. {
  807. return ups != NULL ? ups->alive->len : 0;
  808. }
  809. static void
  810. rspamd_upstream_dtor (struct upstream *up)
  811. {
  812. struct upstream_inet_addr_entry *cur, *tmp;
  813. if (up->new_addrs) {
  814. LL_FOREACH_SAFE(up->new_addrs, cur, tmp) {
  815. /* Here we need to free pointer as well */
  816. rspamd_inet_address_free (cur->addr);
  817. g_free (cur);
  818. }
  819. }
  820. if (up->addrs.addr) {
  821. g_ptr_array_free (up->addrs.addr, TRUE);
  822. }
  823. #ifdef UPSTREAMS_THREAD_SAFE
  824. rspamd_mutex_free (up->lock);
  825. #endif
  826. if (up->ctx) {
  827. if (ev_can_stop (&up->ev)) {
  828. ev_timer_stop (up->ctx->event_loop, &up->ev);
  829. }
  830. g_queue_delete_link (up->ctx->upstreams, up->ctx_pos);
  831. REF_RELEASE (up->ctx);
  832. }
  833. g_free (up);
  834. }
  835. rspamd_inet_addr_t*
  836. rspamd_upstream_addr_next (struct upstream *up)
  837. {
  838. guint idx, next_idx;
  839. struct upstream_addr_elt *e1, *e2;
  840. do {
  841. idx = up->addrs.cur;
  842. next_idx = (idx + 1) % up->addrs.addr->len;
  843. e1 = g_ptr_array_index (up->addrs.addr, idx);
  844. e2 = g_ptr_array_index (up->addrs.addr, next_idx);
  845. up->addrs.cur = next_idx;
  846. } while (e2->errors > e1->errors);
  847. return e2->addr;
  848. }
  849. rspamd_inet_addr_t*
  850. rspamd_upstream_addr_cur (const struct upstream *up)
  851. {
  852. struct upstream_addr_elt *elt;
  853. elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
  854. return elt->addr;
  855. }
  856. const gchar*
  857. rspamd_upstream_name (struct upstream *up)
  858. {
  859. return up->name;
  860. }
  861. gint
  862. rspamd_upstream_port (struct upstream *up)
  863. {
  864. struct upstream_addr_elt *elt;
  865. elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
  866. return rspamd_inet_address_get_port (elt->addr);
  867. }
  868. gboolean
  869. rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
  870. guint16 def_port, enum rspamd_upstream_parse_type parse_type,
  871. void *data)
  872. {
  873. struct upstream *upstream;
  874. GPtrArray *addrs = NULL;
  875. guint i, slen;
  876. rspamd_inet_addr_t *addr;
  877. enum rspamd_parse_host_port_result ret = RSPAMD_PARSE_ADDR_FAIL;
  878. upstream = g_malloc0 (sizeof (*upstream));
  879. slen = strlen (str);
  880. switch (parse_type) {
  881. case RSPAMD_UPSTREAM_PARSE_DEFAULT:
  882. if (slen > sizeof ("service=") &&
  883. RSPAMD_LEN_CHECK_STARTS_WITH (str, slen, "service=")) {
  884. const gchar *plus_pos, *service_pos, *semicolon_pos;
  885. /* Accept service=srv_name+hostname[:priority] */
  886. service_pos = str + sizeof ("service=") - 1;
  887. plus_pos = strchr (service_pos, '+');
  888. if (plus_pos != NULL) {
  889. semicolon_pos = strchr (plus_pos + 1, ':');
  890. if (semicolon_pos) {
  891. upstream->weight = strtoul (semicolon_pos + 1, NULL, 10);
  892. }
  893. else {
  894. semicolon_pos = plus_pos + strlen (plus_pos);
  895. }
  896. /*
  897. * Now our name is _service._tcp.<domain>
  898. * where <domain> is string between semicolon_pos and plus_pos +1
  899. * while service is a string between service_pos and plus_pos
  900. */
  901. guint namelen = (semicolon_pos - (plus_pos + 1)) +
  902. (plus_pos - service_pos) +
  903. (sizeof ("tcp") - 1) +
  904. 4;
  905. addrs = g_ptr_array_sized_new (1);
  906. upstream->name = ups->ctx ?
  907. rspamd_mempool_alloc (ups->ctx->pool, namelen + 1) :
  908. g_malloc (namelen + 1);
  909. rspamd_snprintf (upstream->name, namelen + 1,
  910. "_%*s._tcp.%*s",
  911. (gint)(plus_pos - service_pos), service_pos,
  912. (gint)(semicolon_pos - (plus_pos + 1)), plus_pos + 1);
  913. upstream->flags |= RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE;
  914. ret = RSPAMD_PARSE_ADDR_RESOLVED;
  915. }
  916. }
  917. else {
  918. ret = rspamd_parse_host_port_priority (str, &addrs,
  919. &upstream->weight,
  920. &upstream->name, def_port,
  921. FALSE,
  922. ups->ctx ? ups->ctx->pool : NULL);
  923. }
  924. break;
  925. case RSPAMD_UPSTREAM_PARSE_NAMESERVER:
  926. addrs = g_ptr_array_sized_new (1);
  927. if (rspamd_parse_inet_address (&addr, str, strlen (str),
  928. RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
  929. if (ups->ctx) {
  930. upstream->name = rspamd_mempool_strdup (ups->ctx->pool, str);
  931. }
  932. else {
  933. upstream->name = g_strdup (str);
  934. }
  935. if (rspamd_inet_address_get_port (addr) == 0) {
  936. rspamd_inet_address_set_port (addr, def_port);
  937. }
  938. g_ptr_array_add (addrs, addr);
  939. ret = RSPAMD_PARSE_ADDR_NUMERIC;
  940. if (ups->ctx) {
  941. rspamd_mempool_add_destructor (ups->ctx->pool,
  942. (rspamd_mempool_destruct_t) rspamd_inet_address_free,
  943. addr);
  944. rspamd_mempool_add_destructor (ups->ctx->pool,
  945. (rspamd_mempool_destruct_t) rspamd_ptr_array_free_hard,
  946. addrs);
  947. }
  948. }
  949. else {
  950. g_ptr_array_free (addrs, TRUE);
  951. }
  952. break;
  953. }
  954. if (ret == RSPAMD_PARSE_ADDR_FAIL) {
  955. g_free (upstream);
  956. return FALSE;
  957. }
  958. else {
  959. upstream->flags |= ups->flags;
  960. if (ret == RSPAMD_PARSE_ADDR_NUMERIC) {
  961. /* Add noresolve flag */
  962. upstream->flags |= RSPAMD_UPSTREAM_FLAG_NORESOLVE;
  963. }
  964. for (i = 0; i < addrs->len; i ++) {
  965. addr = g_ptr_array_index (addrs, i);
  966. rspamd_upstream_add_addr (upstream, rspamd_inet_address_copy (addr));
  967. }
  968. }
  969. if (upstream->weight == 0 && ups->rot_alg == RSPAMD_UPSTREAM_MASTER_SLAVE) {
  970. /* Special heuristic for master-slave rotation */
  971. if (ups->ups->len == 0) {
  972. /* Prioritize the first */
  973. upstream->weight = 1;
  974. }
  975. }
  976. g_ptr_array_add (ups->ups, upstream);
  977. upstream->ud = data;
  978. upstream->cur_weight = upstream->weight;
  979. upstream->ls = ups;
  980. REF_INIT_RETAIN (upstream, rspamd_upstream_dtor);
  981. #ifdef UPSTREAMS_THREAD_SAFE
  982. upstream->lock = rspamd_mutex_new ();
  983. #endif
  984. upstream->ctx = ups->ctx;
  985. if (upstream->ctx) {
  986. REF_RETAIN (ups->ctx);
  987. g_queue_push_tail (ups->ctx->upstreams, upstream);
  988. upstream->ctx_pos = g_queue_peek_tail_link (ups->ctx->upstreams);
  989. }
  990. guint h = rspamd_cryptobox_fast_hash (upstream->name,
  991. strlen (upstream->name), 0);
  992. memset (upstream->uid, 0, sizeof (upstream->uid));
  993. rspamd_encode_base32_buf ((const guchar *) &h, sizeof (h),
  994. upstream->uid, sizeof (upstream->uid) - 1, RSPAMD_BASE32_DEFAULT);
  995. msg_debug_upstream ("added upstream %s (%s)", upstream->name,
  996. upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE ? "numeric ip" : "DNS name");
  997. g_ptr_array_sort (upstream->addrs.addr, rspamd_upstream_addr_sort_func);
  998. rspamd_upstream_set_active (ups, upstream);
  999. return TRUE;
  1000. }
  1001. void
  1002. rspamd_upstreams_set_flags (struct upstream_list *ups,
  1003. enum rspamd_upstream_flag flags)
  1004. {
  1005. ups->flags = flags;
  1006. }
  1007. void
  1008. rspamd_upstreams_set_rotation (struct upstream_list *ups,
  1009. enum rspamd_upstream_rotation rot)
  1010. {
  1011. ups->rot_alg = rot;
  1012. }
  1013. gboolean
  1014. rspamd_upstream_add_addr (struct upstream *up, rspamd_inet_addr_t *addr)
  1015. {
  1016. struct upstream_addr_elt *elt;
  1017. /*
  1018. * XXX: slow and inefficient
  1019. */
  1020. if (up->addrs.addr == NULL) {
  1021. up->addrs.addr = g_ptr_array_new_full (8, rspamd_upstream_addr_elt_dtor);
  1022. }
  1023. elt = g_malloc0 (sizeof (*elt));
  1024. elt->addr = addr;
  1025. g_ptr_array_add (up->addrs.addr, elt);
  1026. g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
  1027. return TRUE;
  1028. }
  1029. gboolean
  1030. rspamd_upstreams_parse_line_len (struct upstream_list *ups,
  1031. const gchar *str, gsize len, guint16 def_port, void *data)
  1032. {
  1033. const gchar *end = str + len, *p = str;
  1034. const gchar *separators = ";, \n\r\t";
  1035. gchar *tmp;
  1036. guint span_len;
  1037. gboolean ret = FALSE;
  1038. if (RSPAMD_LEN_CHECK_STARTS_WITH(p, len, "random:")) {
  1039. ups->rot_alg = RSPAMD_UPSTREAM_RANDOM;
  1040. p += sizeof ("random:") - 1;
  1041. }
  1042. else if (RSPAMD_LEN_CHECK_STARTS_WITH(p, len, "master-slave:")) {
  1043. ups->rot_alg = RSPAMD_UPSTREAM_MASTER_SLAVE;
  1044. p += sizeof ("master-slave:") - 1;
  1045. }
  1046. else if (RSPAMD_LEN_CHECK_STARTS_WITH(p, len, "round-robin:")) {
  1047. ups->rot_alg = RSPAMD_UPSTREAM_ROUND_ROBIN;
  1048. p += sizeof ("round-robin:") - 1;
  1049. }
  1050. else if (RSPAMD_LEN_CHECK_STARTS_WITH(p, len, "hash:")) {
  1051. ups->rot_alg = RSPAMD_UPSTREAM_HASHED;
  1052. p += sizeof ("hash:") - 1;
  1053. }
  1054. while (p < end) {
  1055. span_len = rspamd_memcspn (p, separators, end - p);
  1056. if (span_len > 0) {
  1057. tmp = g_malloc (span_len + 1);
  1058. rspamd_strlcpy (tmp, p, span_len + 1);
  1059. if (rspamd_upstreams_add_upstream (ups, tmp, def_port,
  1060. RSPAMD_UPSTREAM_PARSE_DEFAULT,
  1061. data)) {
  1062. ret = TRUE;
  1063. }
  1064. g_free (tmp);
  1065. }
  1066. p += span_len;
  1067. /* Skip separators */
  1068. if (p < end) {
  1069. p += rspamd_memspn (p, separators, end - p);
  1070. }
  1071. }
  1072. if (!ups->ups_line) {
  1073. ups->ups_line = g_malloc (len + 1);
  1074. rspamd_strlcpy (ups->ups_line, str, len + 1);
  1075. }
  1076. return ret;
  1077. }
  1078. gboolean
  1079. rspamd_upstreams_parse_line (struct upstream_list *ups,
  1080. const gchar *str, guint16 def_port, void *data)
  1081. {
  1082. return rspamd_upstreams_parse_line_len (ups, str, strlen (str),
  1083. def_port, data);
  1084. }
  1085. gboolean
  1086. rspamd_upstreams_from_ucl (struct upstream_list *ups,
  1087. const ucl_object_t *in, guint16 def_port, void *data)
  1088. {
  1089. gboolean ret = FALSE;
  1090. const ucl_object_t *cur;
  1091. ucl_object_iter_t it = NULL;
  1092. it = ucl_object_iterate_new (in);
  1093. while ((cur = ucl_object_iterate_safe (it, true)) != NULL) {
  1094. if (ucl_object_type (cur) == UCL_STRING) {
  1095. ret = rspamd_upstreams_parse_line (ups, ucl_object_tostring (cur),
  1096. def_port, data);
  1097. }
  1098. }
  1099. ucl_object_iterate_free (it);
  1100. return ret;
  1101. }
  1102. void
  1103. rspamd_upstreams_destroy (struct upstream_list *ups)
  1104. {
  1105. guint i;
  1106. struct upstream *up;
  1107. struct upstream_list_watcher *w, *tmp;
  1108. if (ups != NULL) {
  1109. g_ptr_array_free (ups->alive, TRUE);
  1110. for (i = 0; i < ups->ups->len; i ++) {
  1111. up = g_ptr_array_index (ups->ups, i);
  1112. up->ls = NULL;
  1113. REF_RELEASE (up);
  1114. }
  1115. DL_FOREACH_SAFE (ups->watchers, w, tmp) {
  1116. if (w->dtor) {
  1117. w->dtor (w->ud);
  1118. }
  1119. g_free (w);
  1120. }
  1121. g_free (ups->ups_line);
  1122. g_ptr_array_free (ups->ups, TRUE);
  1123. #ifdef UPSTREAMS_THREAD_SAFE
  1124. rspamd_mutex_free (ups->lock);
  1125. #endif
  1126. g_free (ups);
  1127. }
  1128. }
  1129. static void
  1130. rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
  1131. {
  1132. struct upstream *up = (struct upstream *)elt;
  1133. struct upstream_list *ups = (struct upstream_list *)ls;
  1134. struct upstream_list_watcher *w;
  1135. /* Here the upstreams list is already locked */
  1136. RSPAMD_UPSTREAM_LOCK (up);
  1137. if (ev_can_stop (&up->ev)) {
  1138. ev_timer_stop (up->ctx->event_loop, &up->ev);
  1139. }
  1140. g_ptr_array_add (ups->alive, up);
  1141. up->active_idx = ups->alive->len - 1;
  1142. RSPAMD_UPSTREAM_UNLOCK (up);
  1143. DL_FOREACH (up->ls->watchers, w) {
  1144. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) {
  1145. w->func (up, RSPAMD_UPSTREAM_WATCH_ONLINE, up->errors, w->ud);
  1146. }
  1147. }
  1148. /* For revive event */
  1149. g_assert (up->ref.refcount > 1);
  1150. REF_RELEASE (up);
  1151. }
  1152. static struct upstream*
  1153. rspamd_upstream_get_random (struct upstream_list *ups,
  1154. struct upstream *except)
  1155. {
  1156. for (;;) {
  1157. guint idx = ottery_rand_range (ups->alive->len - 1);
  1158. struct upstream *up;
  1159. up = g_ptr_array_index (ups->alive, idx);
  1160. if (except && up == except) {
  1161. continue;
  1162. }
  1163. return up;
  1164. }
  1165. }
  1166. static struct upstream*
  1167. rspamd_upstream_get_round_robin (struct upstream_list *ups,
  1168. struct upstream *except,
  1169. gboolean use_cur)
  1170. {
  1171. guint max_weight = 0, min_checked = G_MAXUINT;
  1172. struct upstream *up = NULL, *selected = NULL, *min_checked_sel = NULL;
  1173. guint i;
  1174. /* Select upstream with the maximum cur_weight */
  1175. RSPAMD_UPSTREAM_LOCK (ups);
  1176. for (i = 0; i < ups->alive->len; i ++) {
  1177. up = g_ptr_array_index (ups->alive, i);
  1178. if (except != NULL && up == except) {
  1179. continue;
  1180. }
  1181. if (use_cur) {
  1182. if (up->cur_weight > max_weight) {
  1183. selected = up;
  1184. max_weight = up->cur_weight;
  1185. }
  1186. }
  1187. else {
  1188. if (up->weight > max_weight) {
  1189. selected = up;
  1190. max_weight = up->weight;
  1191. }
  1192. }
  1193. /*
  1194. * This code is used when all upstreams have zero weight
  1195. * The logic is to select least currently used upstream and penalise
  1196. * upstream with errors. The error penalty should no be too high
  1197. * to avoid sudden traffic drop in this case.
  1198. */
  1199. if (up->checked + up->errors * 2 < min_checked) {
  1200. min_checked_sel = up;
  1201. min_checked = up->checked;
  1202. }
  1203. }
  1204. if (max_weight == 0) {
  1205. /* All upstreams have zero weight */
  1206. if (min_checked > G_MAXUINT / 2) {
  1207. /* Reset all checked counters to avoid overflow */
  1208. for (i = 0; i < ups->alive->len; i ++) {
  1209. up = g_ptr_array_index (ups->alive, i);
  1210. up->checked = 0;
  1211. }
  1212. }
  1213. selected = min_checked_sel;
  1214. }
  1215. if (use_cur && selected) {
  1216. if (selected->cur_weight > 0) {
  1217. selected->cur_weight--;
  1218. }
  1219. else {
  1220. selected->cur_weight = selected->weight;
  1221. }
  1222. }
  1223. RSPAMD_UPSTREAM_UNLOCK (ups);
  1224. return selected;
  1225. }
  1226. /*
  1227. * The key idea of this function is obtained from the following paper:
  1228. * A Fast, Minimal Memory, Consistent Hash Algorithm
  1229. * John Lamping, Eric Veach
  1230. *
  1231. * http://arxiv.org/abs/1406.2294
  1232. */
  1233. static guint32
  1234. rspamd_consistent_hash (guint64 key, guint32 nbuckets)
  1235. {
  1236. gint64 b = -1, j = 0;
  1237. while (j < nbuckets) {
  1238. b = j;
  1239. key *= 2862933555777941757ULL + 1;
  1240. j = (b + 1) * (double)(1ULL << 31) / (double)((key >> 33) + 1ULL);
  1241. }
  1242. return b;
  1243. }
  1244. static struct upstream*
  1245. rspamd_upstream_get_hashed (struct upstream_list *ups,
  1246. struct upstream *except,
  1247. const guint8 *key, guint keylen)
  1248. {
  1249. guint64 k;
  1250. guint32 idx;
  1251. static const guint max_tries = 20;
  1252. struct upstream *up = NULL;
  1253. /* Generate 64 bits input key */
  1254. k = rspamd_cryptobox_fast_hash_specific (RSPAMD_CRYPTOBOX_XXHASH64,
  1255. key, keylen, ups->hash_seed);
  1256. RSPAMD_UPSTREAM_LOCK (ups);
  1257. /*
  1258. * Select new upstream from all upstreams
  1259. */
  1260. for (guint i = 0; i < max_tries; i ++) {
  1261. idx = rspamd_consistent_hash (k, ups->ups->len);
  1262. up = g_ptr_array_index (ups->ups, idx);
  1263. if (up->active_idx < 0 || (except != NULL && up == except)) {
  1264. /* Found inactive or excluded upstream */
  1265. k = mum_hash_step (k, ups->hash_seed);
  1266. }
  1267. else {
  1268. break;
  1269. }
  1270. }
  1271. RSPAMD_UPSTREAM_UNLOCK (ups);
  1272. if (up->active_idx >= 0) {
  1273. return up;
  1274. }
  1275. /* We failed to find any active upstream */
  1276. up = rspamd_upstream_get_random (ups, except);
  1277. msg_info ("failed to find hashed upstream for %s, fallback to random: %s",
  1278. ups->ups_line, up->name);
  1279. return up;
  1280. }
  1281. static struct upstream*
  1282. rspamd_upstream_get_common (struct upstream_list *ups,
  1283. struct upstream* except,
  1284. enum rspamd_upstream_rotation default_type,
  1285. const guchar *key, gsize keylen,
  1286. gboolean forced)
  1287. {
  1288. enum rspamd_upstream_rotation type;
  1289. struct upstream *up = NULL;
  1290. RSPAMD_UPSTREAM_LOCK (ups);
  1291. if (ups->alive->len == 0) {
  1292. /* We have no upstreams alive */
  1293. msg_warn ("there are no alive upstreams left for %s, revive all of them",
  1294. ups->ups_line);
  1295. g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups);
  1296. }
  1297. RSPAMD_UPSTREAM_UNLOCK (ups);
  1298. if (ups->alive->len == 1 && default_type != RSPAMD_UPSTREAM_SEQUENTIAL) {
  1299. /* Fast path */
  1300. up = g_ptr_array_index (ups->alive, 0);
  1301. goto end;
  1302. }
  1303. if (!forced) {
  1304. type = ups->rot_alg != RSPAMD_UPSTREAM_UNDEF ? ups->rot_alg : default_type;
  1305. }
  1306. else {
  1307. type = default_type != RSPAMD_UPSTREAM_UNDEF ? default_type : ups->rot_alg;
  1308. }
  1309. if (type == RSPAMD_UPSTREAM_HASHED && (keylen == 0 || key == NULL)) {
  1310. /* Cannot use hashed rotation when no key is specified, switch to random */
  1311. type = RSPAMD_UPSTREAM_RANDOM;
  1312. }
  1313. switch (type) {
  1314. default:
  1315. case RSPAMD_UPSTREAM_RANDOM:
  1316. up = rspamd_upstream_get_random (ups, except);
  1317. break;
  1318. case RSPAMD_UPSTREAM_HASHED:
  1319. up = rspamd_upstream_get_hashed (ups, except, key, keylen);
  1320. break;
  1321. case RSPAMD_UPSTREAM_ROUND_ROBIN:
  1322. up = rspamd_upstream_get_round_robin (ups, except, TRUE);
  1323. break;
  1324. case RSPAMD_UPSTREAM_MASTER_SLAVE:
  1325. up = rspamd_upstream_get_round_robin (ups, except, FALSE);
  1326. break;
  1327. case RSPAMD_UPSTREAM_SEQUENTIAL:
  1328. if (ups->cur_elt >= ups->alive->len) {
  1329. ups->cur_elt = 0;
  1330. return NULL;
  1331. }
  1332. up = g_ptr_array_index (ups->alive, ups->cur_elt ++);
  1333. break;
  1334. }
  1335. end:
  1336. if (up) {
  1337. up->checked ++;
  1338. }
  1339. return up;
  1340. }
  1341. struct upstream*
  1342. rspamd_upstream_get (struct upstream_list *ups,
  1343. enum rspamd_upstream_rotation default_type,
  1344. const guchar *key, gsize keylen)
  1345. {
  1346. return rspamd_upstream_get_common (ups, NULL, default_type, key, keylen, FALSE);
  1347. }
  1348. struct upstream*
  1349. rspamd_upstream_get_forced (struct upstream_list *ups,
  1350. enum rspamd_upstream_rotation forced_type,
  1351. const guchar *key, gsize keylen)
  1352. {
  1353. return rspamd_upstream_get_common (ups, NULL, forced_type, key, keylen, TRUE);
  1354. }
  1355. struct upstream *rspamd_upstream_get_except (struct upstream_list *ups,
  1356. struct upstream *except,
  1357. enum rspamd_upstream_rotation default_type,
  1358. const guchar *key, gsize keylen)
  1359. {
  1360. return rspamd_upstream_get_common (ups, except, default_type, key, keylen, FALSE);
  1361. }
  1362. void
  1363. rspamd_upstream_reresolve (struct upstream_ctx *ctx)
  1364. {
  1365. GList *cur;
  1366. struct upstream *up;
  1367. cur = ctx->upstreams->head;
  1368. while (cur) {
  1369. up = cur->data;
  1370. REF_RETAIN (up);
  1371. rspamd_upstream_resolve_addrs (up->ls, up);
  1372. REF_RELEASE (up);
  1373. cur = g_list_next (cur);
  1374. }
  1375. }
  1376. gpointer
  1377. rspamd_upstream_set_data (struct upstream *up, gpointer data)
  1378. {
  1379. gpointer prev_data = up->data;
  1380. up->data = data;
  1381. return prev_data;
  1382. }
  1383. gpointer
  1384. rspamd_upstream_get_data (struct upstream *up)
  1385. {
  1386. return up->data;
  1387. }
  1388. void
  1389. rspamd_upstreams_foreach (struct upstream_list *ups,
  1390. rspamd_upstream_traverse_func cb, void *ud)
  1391. {
  1392. struct upstream *up;
  1393. guint i;
  1394. for (i = 0; i < ups->ups->len; i ++) {
  1395. up = g_ptr_array_index (ups->ups, i);
  1396. cb (up, i, ud);
  1397. }
  1398. }
  1399. void
  1400. rspamd_upstreams_set_limits (struct upstream_list *ups,
  1401. gdouble revive_time,
  1402. gdouble revive_jitter,
  1403. gdouble error_time,
  1404. gdouble dns_timeout,
  1405. guint max_errors,
  1406. guint dns_retransmits)
  1407. {
  1408. struct upstream_limits *nlimits;
  1409. g_assert (ups != NULL);
  1410. nlimits = rspamd_mempool_alloc (ups->ctx->pool, sizeof (*nlimits));
  1411. memcpy (nlimits, ups->limits, sizeof (*nlimits));
  1412. if (!isnan (revive_time)) {
  1413. nlimits->revive_time = revive_time;
  1414. }
  1415. if (!isnan (revive_jitter)) {
  1416. nlimits->revive_jitter = revive_jitter;
  1417. }
  1418. if (!isnan (error_time)) {
  1419. nlimits->error_time = error_time;
  1420. }
  1421. if (!isnan (dns_timeout)) {
  1422. nlimits->dns_timeout = dns_timeout;
  1423. }
  1424. if (max_errors > 0) {
  1425. nlimits->max_errors = max_errors;
  1426. }
  1427. if (dns_retransmits > 0) {
  1428. nlimits->dns_retransmits = dns_retransmits;
  1429. }
  1430. ups->limits = nlimits;
  1431. }
  1432. void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
  1433. enum rspamd_upstreams_watch_event events,
  1434. rspamd_upstream_watch_func func,
  1435. GFreeFunc dtor,
  1436. gpointer ud)
  1437. {
  1438. struct upstream_list_watcher *nw;
  1439. g_assert ((events & RSPAMD_UPSTREAM_WATCH_ALL) != 0);
  1440. nw = g_malloc (sizeof (*nw));
  1441. nw->func = func;
  1442. nw->events_mask = events;
  1443. nw->ud = ud;
  1444. nw->dtor = dtor;
  1445. DL_APPEND (ups->watchers, nw);
  1446. }
  1447. struct upstream*
  1448. rspamd_upstream_ref (struct upstream *up)
  1449. {
  1450. REF_RETAIN (up);
  1451. return up;
  1452. }
  1453. void
  1454. rspamd_upstream_unref (struct upstream *up)
  1455. {
  1456. REF_RELEASE (up);
  1457. }