You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

upstream.c 44KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774
  1. /*
  2. * Copyright 2024 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "upstream.h"
  18. #include "ottery.h"
  19. #include "ref.h"
  20. #include "cfg_file.h"
  21. #include "rdns.h"
  22. #include "cryptobox.h"
  23. #include "utlist.h"
  24. #include "contrib/libev/ev.h"
  25. #include "logger.h"
  26. #include "contrib/librdns/rdns.h"
  27. #include "contrib/mumhash/mum.h"
  28. #include <math.h>
  29. struct upstream_inet_addr_entry {
  30. rspamd_inet_addr_t *addr;
  31. unsigned int priority;
  32. struct upstream_inet_addr_entry *next;
  33. };
  34. struct upstream_addr_elt {
  35. rspamd_inet_addr_t *addr;
  36. unsigned int priority;
  37. unsigned int errors;
  38. };
  39. struct upstream_list_watcher {
  40. rspamd_upstream_watch_func func;
  41. GFreeFunc dtor;
  42. gpointer ud;
  43. enum rspamd_upstreams_watch_event events_mask;
  44. struct upstream_list_watcher *next, *prev;
  45. };
  46. struct upstream {
  47. unsigned int weight;
  48. unsigned int cur_weight;
  49. unsigned int errors;
  50. unsigned int checked;
  51. unsigned int dns_requests;
  52. int active_idx;
  53. unsigned int ttl;
  54. char *name;
  55. ev_timer ev;
  56. double last_fail;
  57. double last_resolve;
  58. gpointer ud;
  59. enum rspamd_upstream_flag flags;
  60. struct upstream_list *ls;
  61. GList *ctx_pos;
  62. struct upstream_ctx *ctx;
  63. struct {
  64. GPtrArray *addr; /* struct upstream_addr_elt */
  65. unsigned int cur;
  66. } addrs;
  67. struct upstream_inet_addr_entry *new_addrs;
  68. gpointer data;
  69. char uid[8];
  70. ref_entry_t ref;
  71. #ifdef UPSTREAMS_THREAD_SAFE
  72. rspamd_mutex_t *lock;
  73. #endif
  74. };
  75. struct upstream_limits {
  76. double revive_time;
  77. double revive_jitter;
  78. double error_time;
  79. double dns_timeout;
  80. double lazy_resolve_time;
  81. double resolve_min_interval;
  82. unsigned int max_errors;
  83. unsigned int dns_retransmits;
  84. };
  85. struct upstream_list {
  86. char *ups_line;
  87. struct upstream_ctx *ctx;
  88. GPtrArray *ups;
  89. GPtrArray *alive;
  90. struct upstream_list_watcher *watchers;
  91. uint64_t hash_seed;
  92. const struct upstream_limits *limits;
  93. enum rspamd_upstream_flag flags;
  94. unsigned int cur_elt;
  95. enum rspamd_upstream_rotation rot_alg;
  96. #ifdef UPSTREAMS_THREAD_SAFE
  97. rspamd_mutex_t *lock;
  98. #endif
  99. };
  100. struct upstream_ctx {
  101. struct rdns_resolver *res;
  102. struct ev_loop *event_loop;
  103. struct upstream_limits limits;
  104. GQueue *upstreams;
  105. gboolean configured;
  106. rspamd_mempool_t *pool;
  107. ref_entry_t ref;
  108. };
  109. #ifndef UPSTREAMS_THREAD_SAFE
  110. #define RSPAMD_UPSTREAM_LOCK(x) \
  111. do { \
  112. } while (0)
  113. #define RSPAMD_UPSTREAM_UNLOCK(x) \
  114. do { \
  115. } while (0)
  116. #else
  117. #define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x->lock)
  118. #define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x->lock)
  119. #endif
  120. #define msg_debug_upstream(...) rspamd_conditional_debug_fast(NULL, NULL, \
  121. rspamd_upstream_log_id, "upstream", upstream->uid, \
  122. G_STRFUNC, \
  123. __VA_ARGS__)
  124. #define msg_info_upstream(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \
  125. "upstream", upstream->uid, \
  126. G_STRFUNC, \
  127. __VA_ARGS__)
  128. #define msg_err_upstream(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \
  129. "upstream", upstream->uid, \
  130. G_STRFUNC, \
  131. __VA_ARGS__)
  132. INIT_LOG_MODULE(upstream)
  133. /* 4 errors in 10 seconds */
  134. #define DEFAULT_MAX_ERRORS 4
  135. static const unsigned int default_max_errors = DEFAULT_MAX_ERRORS;
  136. #define DEFAULT_REVIVE_TIME 60
  137. static const double default_revive_time = DEFAULT_REVIVE_TIME;
  138. #define DEFAULT_REVIVE_JITTER 0.4
  139. static const double default_revive_jitter = DEFAULT_REVIVE_JITTER;
  140. #define DEFAULT_ERROR_TIME 10
  141. static const double default_error_time = DEFAULT_ERROR_TIME;
  142. #define DEFAULT_DNS_TIMEOUT 1.0
  143. static const double default_dns_timeout = DEFAULT_DNS_TIMEOUT;
  144. #define DEFAULT_DNS_RETRANSMITS 2
  145. static const unsigned int default_dns_retransmits = DEFAULT_DNS_RETRANSMITS;
  146. #define DEFAULT_LAZY_RESOLVE_TIME 3600.0
  147. static const double default_lazy_resolve_time = DEFAULT_LAZY_RESOLVE_TIME;
  148. #define DEFAULT_RESOLVE_MIN_INTERVAL 60.0
  149. static const double default_resolve_min_interval = DEFAULT_RESOLVE_MIN_INTERVAL;
  150. static const struct upstream_limits default_limits = {
  151. .revive_time = DEFAULT_REVIVE_TIME,
  152. .revive_jitter = DEFAULT_REVIVE_JITTER,
  153. .error_time = DEFAULT_ERROR_TIME,
  154. .dns_timeout = DEFAULT_DNS_TIMEOUT,
  155. .dns_retransmits = DEFAULT_DNS_RETRANSMITS,
  156. .max_errors = DEFAULT_MAX_ERRORS,
  157. .lazy_resolve_time = DEFAULT_LAZY_RESOLVE_TIME,
  158. .resolve_min_interval = DEFAULT_RESOLVE_MIN_INTERVAL,
  159. };
  160. static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int);
  161. void rspamd_upstreams_library_config(struct rspamd_config *cfg,
  162. struct upstream_ctx *ctx,
  163. struct ev_loop *event_loop,
  164. struct rdns_resolver *resolver)
  165. {
  166. g_assert(ctx != NULL);
  167. g_assert(cfg != NULL);
  168. if (cfg->upstream_error_time) {
  169. ctx->limits.error_time = cfg->upstream_error_time;
  170. }
  171. if (cfg->upstream_max_errors) {
  172. ctx->limits.max_errors = cfg->upstream_max_errors;
  173. }
  174. if (cfg->upstream_revive_time) {
  175. ctx->limits.revive_time = cfg->upstream_revive_time;
  176. }
  177. if (cfg->upstream_lazy_resolve_time) {
  178. ctx->limits.lazy_resolve_time = cfg->upstream_lazy_resolve_time;
  179. }
  180. if (cfg->dns_retransmits) {
  181. ctx->limits.dns_retransmits = cfg->dns_retransmits;
  182. }
  183. if (cfg->dns_timeout) {
  184. ctx->limits.dns_timeout = cfg->dns_timeout;
  185. }
  186. if (cfg->upstream_resolve_min_interval) {
  187. ctx->limits.resolve_min_interval = cfg->upstream_resolve_min_interval;
  188. }
  189. /* Some sanity checks */
  190. if (ctx->limits.resolve_min_interval > ctx->limits.revive_time) {
  191. /* We must be able to resolve host during the revive time */
  192. ctx->limits.resolve_min_interval = ctx->limits.revive_time;
  193. }
  194. ctx->event_loop = event_loop;
  195. ctx->res = resolver;
  196. ctx->configured = TRUE;
  197. /* Start lazy resolving */
  198. if (event_loop && resolver) {
  199. GList *cur;
  200. struct upstream *upstream;
  201. cur = ctx->upstreams->head;
  202. while (cur) {
  203. upstream = cur->data;
  204. if (!ev_can_stop(&upstream->ev) && upstream->ls &&
  205. !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
  206. double when;
  207. if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
  208. /* Resolve them immediately ! */
  209. when = 0.0;
  210. }
  211. else {
  212. when = rspamd_time_jitter(upstream->ls->limits->lazy_resolve_time,
  213. upstream->ls->limits->lazy_resolve_time * .1);
  214. }
  215. ev_timer_init(&upstream->ev, rspamd_upstream_lazy_resolve_cb,
  216. when, 0);
  217. upstream->ev.data = upstream;
  218. ev_timer_start(ctx->event_loop, &upstream->ev);
  219. }
  220. cur = g_list_next(cur);
  221. }
  222. }
  223. }
  224. static void
  225. rspamd_upstream_ctx_dtor(struct upstream_ctx *ctx)
  226. {
  227. GList *cur;
  228. struct upstream *u;
  229. cur = ctx->upstreams->head;
  230. while (cur) {
  231. u = cur->data;
  232. u->ctx = NULL;
  233. u->ctx_pos = NULL;
  234. cur = g_list_next(cur);
  235. }
  236. g_queue_free(ctx->upstreams);
  237. rspamd_mempool_delete(ctx->pool);
  238. g_free(ctx);
  239. }
  240. void rspamd_upstreams_library_unref(struct upstream_ctx *ctx)
  241. {
  242. REF_RELEASE(ctx);
  243. }
  244. struct upstream_ctx *
  245. rspamd_upstreams_library_init(void)
  246. {
  247. struct upstream_ctx *ctx;
  248. ctx = g_malloc0(sizeof(*ctx));
  249. memcpy(&ctx->limits, &default_limits, sizeof(ctx->limits));
  250. ctx->pool = rspamd_mempool_new(rspamd_mempool_suggest_size(),
  251. "upstreams", 0);
  252. ctx->upstreams = g_queue_new();
  253. REF_INIT_RETAIN(ctx, rspamd_upstream_ctx_dtor);
  254. return ctx;
  255. }
  256. static int
  257. rspamd_upstream_af_to_weight(const rspamd_inet_addr_t *addr)
  258. {
  259. int ret;
  260. switch (rspamd_inet_address_get_af(addr)) {
  261. case AF_UNIX:
  262. ret = 2;
  263. break;
  264. case AF_INET:
  265. ret = 1;
  266. break;
  267. default:
  268. ret = 0;
  269. break;
  270. }
  271. return ret;
  272. }
  273. /*
  274. * Select IPv4 addresses before IPv6
  275. */
  276. static int
  277. rspamd_upstream_addr_sort_func(gconstpointer a, gconstpointer b)
  278. {
  279. const struct upstream_addr_elt *ip1 = *(const struct upstream_addr_elt **) a,
  280. *ip2 = *(const struct upstream_addr_elt **) b;
  281. int w1, w2;
  282. if (ip1->priority == 0 && ip2->priority == 0) {
  283. w1 = rspamd_upstream_af_to_weight(ip1->addr);
  284. w2 = rspamd_upstream_af_to_weight(ip2->addr);
  285. }
  286. else {
  287. w1 = ip1->priority;
  288. w2 = ip2->priority;
  289. }
  290. /* Inverse order */
  291. return w2 - w1;
  292. }
  293. static void
  294. rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream)
  295. {
  296. RSPAMD_UPSTREAM_LOCK(ls);
  297. g_ptr_array_add(ls->alive, upstream);
  298. upstream->active_idx = ls->alive->len - 1;
  299. if (upstream->ctx && upstream->ctx->configured &&
  300. !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
  301. if (ev_can_stop(&upstream->ev)) {
  302. ev_timer_stop(upstream->ctx->event_loop, &upstream->ev);
  303. }
  304. /* Start lazy (or not so lazy) names resolution */
  305. double when;
  306. if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
  307. /* Resolve them immediately ! */
  308. when = 0.0;
  309. }
  310. else {
  311. when = rspamd_time_jitter(upstream->ls->limits->lazy_resolve_time,
  312. upstream->ls->limits->lazy_resolve_time * .1);
  313. }
  314. ev_timer_init(&upstream->ev, rspamd_upstream_lazy_resolve_cb,
  315. when, 0);
  316. upstream->ev.data = upstream;
  317. msg_debug_upstream("start lazy resolving for %s in %.0f seconds",
  318. upstream->name, when);
  319. ev_timer_start(upstream->ctx->event_loop, &upstream->ev);
  320. }
  321. RSPAMD_UPSTREAM_UNLOCK(ls);
  322. }
  323. static void
  324. rspamd_upstream_addr_elt_dtor(gpointer a)
  325. {
  326. struct upstream_addr_elt *elt = a;
  327. if (elt) {
  328. rspamd_inet_address_free(elt->addr);
  329. g_free(elt);
  330. }
  331. }
  332. static void
  333. rspamd_upstream_update_addrs(struct upstream *upstream)
  334. {
  335. unsigned int addr_cnt, i, port;
  336. gboolean seen_addr, reset_errors = FALSE;
  337. struct upstream_inet_addr_entry *cur, *tmp;
  338. GPtrArray *new_addrs;
  339. struct upstream_addr_elt *addr_elt, *naddr;
  340. /*
  341. * We need first of all get the saved port, since DNS gives us no
  342. * idea about what port has been used previously
  343. */
  344. RSPAMD_UPSTREAM_LOCK(upstream);
  345. if (upstream->addrs.addr->len > 0 && upstream->new_addrs) {
  346. addr_elt = g_ptr_array_index(upstream->addrs.addr, 0);
  347. port = rspamd_inet_address_get_port(addr_elt->addr);
  348. /* Now calculate new addrs count */
  349. addr_cnt = 0;
  350. LL_FOREACH(upstream->new_addrs, cur)
  351. {
  352. addr_cnt++;
  353. }
  354. /* At 10% probability reset errors on addr elements */
  355. if (rspamd_random_double_fast() > 0.9) {
  356. reset_errors = TRUE;
  357. msg_debug_upstream("reset errors on upstream %s",
  358. upstream->name);
  359. }
  360. new_addrs = g_ptr_array_new_full(addr_cnt, rspamd_upstream_addr_elt_dtor);
  361. /* Copy addrs back */
  362. LL_FOREACH(upstream->new_addrs, cur)
  363. {
  364. seen_addr = FALSE;
  365. naddr = NULL;
  366. /* Ports are problematic, set to compare in the next block */
  367. rspamd_inet_address_set_port(cur->addr, port);
  368. PTR_ARRAY_FOREACH(upstream->addrs.addr, i, addr_elt)
  369. {
  370. if (rspamd_inet_address_compare(addr_elt->addr, cur->addr, FALSE) == 0) {
  371. naddr = g_malloc0(sizeof(*naddr));
  372. naddr->addr = cur->addr;
  373. naddr->errors = reset_errors ? 0 : addr_elt->errors;
  374. seen_addr = TRUE;
  375. break;
  376. }
  377. }
  378. if (!seen_addr) {
  379. naddr = g_malloc0(sizeof(*naddr));
  380. naddr->addr = cur->addr;
  381. naddr->errors = 0;
  382. msg_debug_upstream("new address for %s: %s",
  383. upstream->name,
  384. rspamd_inet_address_to_string_pretty(naddr->addr));
  385. }
  386. else {
  387. msg_debug_upstream("existing address for %s: %s",
  388. upstream->name,
  389. rspamd_inet_address_to_string_pretty(cur->addr));
  390. }
  391. g_ptr_array_add(new_addrs, naddr);
  392. }
  393. /* Free old addresses */
  394. g_ptr_array_free(upstream->addrs.addr, TRUE);
  395. upstream->addrs.cur = 0;
  396. upstream->addrs.addr = new_addrs;
  397. g_ptr_array_sort(upstream->addrs.addr, rspamd_upstream_addr_sort_func);
  398. }
  399. LL_FOREACH_SAFE(upstream->new_addrs, cur, tmp)
  400. {
  401. /* Do not free inet address pointer since it has been transferred to up */
  402. g_free(cur);
  403. }
  404. upstream->new_addrs = NULL;
  405. RSPAMD_UPSTREAM_UNLOCK(upstream);
  406. }
  407. static void
  408. rspamd_upstream_dns_cb(struct rdns_reply *reply, void *arg)
  409. {
  410. struct upstream *up = (struct upstream *) arg;
  411. struct rdns_reply_entry *entry;
  412. struct upstream_inet_addr_entry *up_ent;
  413. if (reply->code == RDNS_RC_NOERROR) {
  414. entry = reply->entries;
  415. RSPAMD_UPSTREAM_LOCK(up);
  416. while (entry) {
  417. if (entry->type == RDNS_REQUEST_A) {
  418. up_ent = g_malloc0(sizeof(*up_ent));
  419. up_ent->addr = rspamd_inet_address_new(AF_INET,
  420. &entry->content.a.addr);
  421. LL_PREPEND(up->new_addrs, up_ent);
  422. }
  423. else if (entry->type == RDNS_REQUEST_AAAA) {
  424. up_ent = g_malloc0(sizeof(*up_ent));
  425. up_ent->addr = rspamd_inet_address_new(AF_INET6,
  426. &entry->content.aaa.addr);
  427. LL_PREPEND(up->new_addrs, up_ent);
  428. }
  429. entry = entry->next;
  430. }
  431. RSPAMD_UPSTREAM_UNLOCK(up);
  432. }
  433. up->dns_requests--;
  434. if (up->dns_requests == 0) {
  435. rspamd_upstream_update_addrs(up);
  436. }
  437. REF_RELEASE(up);
  438. }
  439. struct rspamd_upstream_srv_dns_cb {
  440. struct upstream *up;
  441. unsigned int priority;
  442. unsigned int port;
  443. unsigned int requests_inflight;
  444. };
  445. /* Used when we have resolved SRV record and resolved addrs */
  446. static void
  447. rspamd_upstream_dns_srv_phase2_cb(struct rdns_reply *reply, void *arg)
  448. {
  449. struct rspamd_upstream_srv_dns_cb *cbdata =
  450. (struct rspamd_upstream_srv_dns_cb *) arg;
  451. struct upstream *up;
  452. struct rdns_reply_entry *entry;
  453. struct upstream_inet_addr_entry *up_ent;
  454. up = cbdata->up;
  455. if (reply->code == RDNS_RC_NOERROR) {
  456. entry = reply->entries;
  457. RSPAMD_UPSTREAM_LOCK(up);
  458. while (entry) {
  459. if (entry->type == RDNS_REQUEST_A) {
  460. up_ent = g_malloc0(sizeof(*up_ent));
  461. up_ent->addr = rspamd_inet_address_new(AF_INET,
  462. &entry->content.a.addr);
  463. up_ent->priority = cbdata->priority;
  464. rspamd_inet_address_set_port(up_ent->addr, cbdata->port);
  465. LL_PREPEND(up->new_addrs, up_ent);
  466. }
  467. else if (entry->type == RDNS_REQUEST_AAAA) {
  468. up_ent = g_malloc0(sizeof(*up_ent));
  469. up_ent->addr = rspamd_inet_address_new(AF_INET6,
  470. &entry->content.aaa.addr);
  471. up_ent->priority = cbdata->priority;
  472. rspamd_inet_address_set_port(up_ent->addr, cbdata->port);
  473. LL_PREPEND(up->new_addrs, up_ent);
  474. }
  475. entry = entry->next;
  476. }
  477. RSPAMD_UPSTREAM_UNLOCK(up);
  478. }
  479. up->dns_requests--;
  480. cbdata->requests_inflight--;
  481. if (cbdata->requests_inflight == 0) {
  482. g_free(cbdata);
  483. }
  484. if (up->dns_requests == 0) {
  485. rspamd_upstream_update_addrs(up);
  486. }
  487. REF_RELEASE(up);
  488. }
  489. static void
  490. rspamd_upstream_dns_srv_cb(struct rdns_reply *reply, void *arg)
  491. {
  492. struct upstream *upstream = (struct upstream *) arg;
  493. struct rdns_reply_entry *entry;
  494. struct rspamd_upstream_srv_dns_cb *ncbdata;
  495. if (reply->code == RDNS_RC_NOERROR) {
  496. entry = reply->entries;
  497. RSPAMD_UPSTREAM_LOCK(upstream);
  498. while (entry) {
  499. /* XXX: we ignore weight as it contradicts with upstreams logic */
  500. if (entry->type == RDNS_REQUEST_SRV) {
  501. msg_debug_upstream("got srv reply for %s: %s "
  502. "(weight=%d, priority=%d, port=%d)",
  503. upstream->name, entry->content.srv.target,
  504. entry->content.srv.weight, entry->content.srv.priority,
  505. entry->content.srv.port);
  506. ncbdata = g_malloc0(sizeof(*ncbdata));
  507. ncbdata->priority = entry->content.srv.weight;
  508. ncbdata->port = entry->content.srv.port;
  509. /* XXX: for all entries? */
  510. upstream->ttl = entry->ttl;
  511. if (rdns_make_request_full(upstream->ctx->res,
  512. rspamd_upstream_dns_srv_phase2_cb, ncbdata,
  513. upstream->ls->limits->dns_timeout,
  514. upstream->ls->limits->dns_retransmits,
  515. 1, entry->content.srv.target, RDNS_REQUEST_A) != NULL) {
  516. upstream->dns_requests++;
  517. REF_RETAIN(upstream);
  518. ncbdata->requests_inflight++;
  519. }
  520. if (rdns_make_request_full(upstream->ctx->res,
  521. rspamd_upstream_dns_srv_phase2_cb, ncbdata,
  522. upstream->ls->limits->dns_timeout,
  523. upstream->ls->limits->dns_retransmits,
  524. 1, entry->content.srv.target, RDNS_REQUEST_AAAA) != NULL) {
  525. upstream->dns_requests++;
  526. REF_RETAIN(upstream);
  527. ncbdata->requests_inflight++;
  528. }
  529. if (ncbdata->requests_inflight == 0) {
  530. g_free(ncbdata);
  531. }
  532. }
  533. entry = entry->next;
  534. }
  535. RSPAMD_UPSTREAM_UNLOCK(upstream);
  536. }
  537. upstream->dns_requests--;
  538. REF_RELEASE(upstream);
  539. }
  540. static void
  541. rspamd_upstream_revive_cb(struct ev_loop *loop, ev_timer *w, int revents)
  542. {
  543. struct upstream *upstream = (struct upstream *) w->data;
  544. RSPAMD_UPSTREAM_LOCK(upstream);
  545. ev_timer_stop(loop, w);
  546. msg_debug_upstream("revive upstream %s", upstream->name);
  547. if (upstream->ls) {
  548. rspamd_upstream_set_active(upstream->ls, upstream);
  549. }
  550. RSPAMD_UPSTREAM_UNLOCK(upstream);
  551. g_assert(upstream->ref.refcount > 1);
  552. REF_RELEASE(upstream);
  553. }
  554. static void
  555. rspamd_upstream_resolve_addrs(const struct upstream_list *ls,
  556. struct upstream *upstream)
  557. {
  558. if (upstream->ctx->res != NULL &&
  559. upstream->ctx->configured &&
  560. upstream->dns_requests == 0 &&
  561. !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
  562. double now = ev_now(upstream->ctx->event_loop);
  563. if (now - upstream->last_resolve < upstream->ctx->limits.resolve_min_interval) {
  564. msg_info_upstream("do not resolve upstream %s as it was checked %.0f "
  565. "seconds ago (%.0f is minimum)",
  566. upstream->name, now - upstream->last_resolve,
  567. upstream->ctx->limits.resolve_min_interval);
  568. return;
  569. }
  570. /* Resolve name of the upstream one more time */
  571. if (upstream->name[0] != '/') {
  572. upstream->last_resolve = now;
  573. /*
  574. * If upstream name has a port, then we definitely need to resolve
  575. * merely host part!
  576. */
  577. char dns_name[253 + 1]; /* 253 == max dns name + \0 */
  578. const char *semicolon_pos = strchr(upstream->name, ':');
  579. if (semicolon_pos != NULL && semicolon_pos > upstream->name) {
  580. if (sizeof(dns_name) > semicolon_pos - upstream->name) {
  581. rspamd_strlcpy(dns_name, upstream->name,
  582. semicolon_pos - upstream->name + 1);
  583. }
  584. else {
  585. /* XXX: truncated */
  586. msg_err_upstream("internal error: upstream name is larger than"
  587. "max DNS name: %s",
  588. upstream->name);
  589. rspamd_strlcpy(dns_name, upstream->name, sizeof(dns_name));
  590. }
  591. }
  592. else {
  593. rspamd_strlcpy(dns_name, upstream->name, sizeof(dns_name));
  594. }
  595. if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
  596. if (rdns_make_request_full(upstream->ctx->res,
  597. rspamd_upstream_dns_srv_cb, upstream,
  598. ls->limits->dns_timeout, ls->limits->dns_retransmits,
  599. 1, dns_name, RDNS_REQUEST_SRV) != NULL) {
  600. upstream->dns_requests++;
  601. REF_RETAIN(upstream);
  602. }
  603. }
  604. else {
  605. if (rdns_make_request_full(upstream->ctx->res,
  606. rspamd_upstream_dns_cb, upstream,
  607. ls->limits->dns_timeout, ls->limits->dns_retransmits,
  608. 1, dns_name, RDNS_REQUEST_A) != NULL) {
  609. upstream->dns_requests++;
  610. REF_RETAIN(upstream);
  611. }
  612. if (rdns_make_request_full(upstream->ctx->res,
  613. rspamd_upstream_dns_cb, upstream,
  614. ls->limits->dns_timeout, ls->limits->dns_retransmits,
  615. 1, dns_name, RDNS_REQUEST_AAAA) != NULL) {
  616. upstream->dns_requests++;
  617. REF_RETAIN(upstream);
  618. }
  619. }
  620. }
  621. }
  622. else if (upstream->dns_requests != 0) {
  623. msg_info_upstream("do not resolve upstream %s as another request for "
  624. "resolving has been already issued",
  625. upstream->name);
  626. }
  627. }
  628. static void
  629. rspamd_upstream_lazy_resolve_cb(struct ev_loop *loop, ev_timer *w, int revents)
  630. {
  631. struct upstream *up = (struct upstream *) w->data;
  632. RSPAMD_UPSTREAM_LOCK(up);
  633. ev_timer_stop(loop, w);
  634. if (up->ls) {
  635. rspamd_upstream_resolve_addrs(up->ls, up);
  636. if (up->ttl == 0 || up->ttl > up->ls->limits->lazy_resolve_time) {
  637. w->repeat = rspamd_time_jitter(up->ls->limits->lazy_resolve_time,
  638. up->ls->limits->lazy_resolve_time * .1);
  639. }
  640. else {
  641. w->repeat = up->ttl;
  642. }
  643. ev_timer_again(loop, w);
  644. }
  645. RSPAMD_UPSTREAM_UNLOCK(up);
  646. }
  647. static void
  648. rspamd_upstream_set_inactive(struct upstream_list *ls, struct upstream *upstream)
  649. {
  650. double ntim;
  651. unsigned int i;
  652. struct upstream *cur;
  653. struct upstream_list_watcher *w;
  654. g_assert(upstream != NULL);
  655. RSPAMD_UPSTREAM_LOCK(ls);
  656. g_ptr_array_remove_index(ls->alive, upstream->active_idx);
  657. upstream->active_idx = -1;
  658. /* We need to update all indices */
  659. for (i = 0; i < ls->alive->len; i++) {
  660. cur = g_ptr_array_index(ls->alive, i);
  661. cur->active_idx = i;
  662. }
  663. if (upstream->ctx) {
  664. rspamd_upstream_resolve_addrs(ls, upstream);
  665. REF_RETAIN(upstream);
  666. ntim = rspamd_time_jitter(ls->limits->revive_time,
  667. ls->limits->revive_time * ls->limits->revive_jitter);
  668. if (ev_can_stop(&upstream->ev)) {
  669. ev_timer_stop(upstream->ctx->event_loop, &upstream->ev);
  670. }
  671. msg_debug_upstream("mark upstream %s inactive; revive in %.0f seconds",
  672. upstream->name, ntim);
  673. ev_timer_init(&upstream->ev, rspamd_upstream_revive_cb, ntim, 0);
  674. upstream->ev.data = upstream;
  675. if (upstream->ctx->event_loop != NULL && upstream->ctx->configured) {
  676. ev_timer_start(upstream->ctx->event_loop, &upstream->ev);
  677. }
  678. }
  679. DL_FOREACH(upstream->ls->watchers, w)
  680. {
  681. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) {
  682. w->func(upstream, RSPAMD_UPSTREAM_WATCH_OFFLINE, upstream->errors, w->ud);
  683. }
  684. }
  685. RSPAMD_UPSTREAM_UNLOCK(ls);
  686. }
  687. void rspamd_upstream_fail(struct upstream *upstream,
  688. gboolean addr_failure,
  689. const char *reason)
  690. {
  691. double error_rate = 0, max_error_rate = 0;
  692. double sec_last, sec_cur;
  693. struct upstream_addr_elt *addr_elt;
  694. struct upstream_list_watcher *w;
  695. g_assert(upstream != NULL);
  696. msg_debug_upstream("upstream %s failed; reason: %s",
  697. upstream->name,
  698. reason);
  699. if (upstream->ctx && upstream->active_idx != -1 && upstream->ls) {
  700. sec_cur = rspamd_get_ticks(FALSE);
  701. RSPAMD_UPSTREAM_LOCK(upstream);
  702. if (upstream->errors == 0) {
  703. /* We have the first error */
  704. upstream->last_fail = sec_cur;
  705. upstream->errors = 1;
  706. if (upstream->ls && upstream->dns_requests == 0) {
  707. /* Try to re-resolve address immediately */
  708. rspamd_upstream_resolve_addrs(upstream->ls, upstream);
  709. }
  710. DL_FOREACH(upstream->ls->watchers, w)
  711. {
  712. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
  713. w->func(upstream, RSPAMD_UPSTREAM_WATCH_FAILURE, 1, w->ud);
  714. }
  715. }
  716. }
  717. else {
  718. sec_last = upstream->last_fail;
  719. if (sec_cur >= sec_last) {
  720. upstream->errors++;
  721. DL_FOREACH(upstream->ls->watchers, w)
  722. {
  723. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
  724. w->func(upstream, RSPAMD_UPSTREAM_WATCH_FAILURE,
  725. upstream->errors, w->ud);
  726. }
  727. }
  728. if (sec_cur - sec_last >= upstream->ls->limits->error_time) {
  729. error_rate = ((double) upstream->errors) / (sec_cur - sec_last);
  730. max_error_rate = ((double) upstream->ls->limits->max_errors) /
  731. upstream->ls->limits->error_time;
  732. }
  733. if (error_rate > max_error_rate) {
  734. /* Remove upstream from the active list */
  735. if (upstream->ls->ups->len > 1) {
  736. msg_debug_upstream("mark upstream %s inactive; "
  737. "reason: %s; %.2f "
  738. "error rate (%d errors), "
  739. "%.2f max error rate, "
  740. "%.1f first error time, "
  741. "%.1f current ts, "
  742. "%d upstreams left",
  743. upstream->name,
  744. reason,
  745. error_rate,
  746. upstream->errors,
  747. max_error_rate,
  748. sec_last,
  749. sec_cur,
  750. upstream->ls->alive->len - 1);
  751. rspamd_upstream_set_inactive(upstream->ls, upstream);
  752. upstream->errors = 0;
  753. }
  754. else {
  755. msg_debug_upstream("cannot mark last alive upstream %s "
  756. "inactive; reason: %s; %.2f "
  757. "error rate (%d errors), "
  758. "%.2f max error rate, "
  759. "%.1f first error time, "
  760. "%.1f current ts",
  761. upstream->name,
  762. reason,
  763. error_rate,
  764. upstream->errors,
  765. max_error_rate,
  766. sec_last,
  767. sec_cur);
  768. /* Just re-resolve addresses */
  769. if (sec_cur - sec_last > upstream->ls->limits->revive_time) {
  770. upstream->errors = 0;
  771. rspamd_upstream_resolve_addrs(upstream->ls, upstream);
  772. }
  773. }
  774. }
  775. else if (sec_cur - sec_last >= upstream->ls->limits->error_time) {
  776. /* Forget the whole interval */
  777. upstream->last_fail = sec_cur;
  778. upstream->errors = 1;
  779. }
  780. }
  781. }
  782. if (addr_failure) {
  783. /* Also increase count of errors for this specific address */
  784. if (upstream->addrs.addr) {
  785. addr_elt = g_ptr_array_index(upstream->addrs.addr,
  786. upstream->addrs.cur);
  787. addr_elt->errors++;
  788. }
  789. }
  790. RSPAMD_UPSTREAM_UNLOCK(upstream);
  791. }
  792. }
  793. void rspamd_upstream_ok(struct upstream *upstream)
  794. {
  795. struct upstream_addr_elt *addr_elt;
  796. struct upstream_list_watcher *w;
  797. RSPAMD_UPSTREAM_LOCK(upstream);
  798. if (upstream->errors > 0 && upstream->active_idx != -1 && upstream->ls) {
  799. /* We touch upstream if and only if it is active */
  800. msg_debug_upstream("reset errors on upstream %s (was %ud)", upstream->name, upstream->errors);
  801. upstream->errors = 0;
  802. if (upstream->addrs.addr) {
  803. addr_elt = g_ptr_array_index(upstream->addrs.addr, upstream->addrs.cur);
  804. addr_elt->errors = 0;
  805. }
  806. DL_FOREACH(upstream->ls->watchers, w)
  807. {
  808. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_SUCCESS) {
  809. w->func(upstream, RSPAMD_UPSTREAM_WATCH_SUCCESS, 0, w->ud);
  810. }
  811. }
  812. }
  813. RSPAMD_UPSTREAM_UNLOCK(upstream);
  814. }
  815. void rspamd_upstream_set_weight(struct upstream *up, unsigned int weight)
  816. {
  817. RSPAMD_UPSTREAM_LOCK(up);
  818. up->weight = weight;
  819. RSPAMD_UPSTREAM_UNLOCK(up);
  820. }
  821. #define SEED_CONSTANT 0xa574de7df64e9b9dULL
  822. struct upstream_list *
  823. rspamd_upstreams_create(struct upstream_ctx *ctx)
  824. {
  825. struct upstream_list *ls;
  826. ls = g_malloc0(sizeof(*ls));
  827. ls->hash_seed = SEED_CONSTANT;
  828. ls->ups = g_ptr_array_new();
  829. ls->alive = g_ptr_array_new();
  830. #ifdef UPSTREAMS_THREAD_SAFE
  831. ls->lock = rspamd_mutex_new();
  832. #endif
  833. ls->cur_elt = 0;
  834. ls->ctx = ctx;
  835. ls->rot_alg = RSPAMD_UPSTREAM_UNDEF;
  836. if (ctx) {
  837. ls->limits = &ctx->limits;
  838. }
  839. else {
  840. ls->limits = &default_limits;
  841. }
  842. return ls;
  843. }
  844. gsize rspamd_upstreams_count(struct upstream_list *ups)
  845. {
  846. return ups != NULL ? ups->ups->len : 0;
  847. }
  848. gsize rspamd_upstreams_alive(struct upstream_list *ups)
  849. {
  850. return ups != NULL ? ups->alive->len : 0;
  851. }
  852. static void
  853. rspamd_upstream_dtor(struct upstream *up)
  854. {
  855. struct upstream_inet_addr_entry *cur, *tmp;
  856. if (up->new_addrs) {
  857. LL_FOREACH_SAFE(up->new_addrs, cur, tmp)
  858. {
  859. /* Here we need to free pointer as well */
  860. rspamd_inet_address_free(cur->addr);
  861. g_free(cur);
  862. }
  863. }
  864. if (up->addrs.addr) {
  865. g_ptr_array_free(up->addrs.addr, TRUE);
  866. }
  867. #ifdef UPSTREAMS_THREAD_SAFE
  868. rspamd_mutex_free(up->lock);
  869. #endif
  870. if (up->ctx) {
  871. if (ev_can_stop(&up->ev)) {
  872. ev_timer_stop(up->ctx->event_loop, &up->ev);
  873. }
  874. g_queue_delete_link(up->ctx->upstreams, up->ctx_pos);
  875. REF_RELEASE(up->ctx);
  876. }
  877. g_free(up);
  878. }
  879. rspamd_inet_addr_t *
  880. rspamd_upstream_addr_next(struct upstream *up)
  881. {
  882. unsigned int idx, next_idx;
  883. struct upstream_addr_elt *e1, *e2;
  884. do {
  885. idx = up->addrs.cur;
  886. next_idx = (idx + 1) % up->addrs.addr->len;
  887. e1 = g_ptr_array_index(up->addrs.addr, idx);
  888. e2 = g_ptr_array_index(up->addrs.addr, next_idx);
  889. up->addrs.cur = next_idx;
  890. } while (e2->errors > e1->errors);
  891. return e2->addr;
  892. }
  893. rspamd_inet_addr_t *
  894. rspamd_upstream_addr_cur(const struct upstream *up)
  895. {
  896. struct upstream_addr_elt *elt;
  897. elt = g_ptr_array_index(up->addrs.addr, up->addrs.cur);
  898. return elt->addr;
  899. }
  900. const char *
  901. rspamd_upstream_name(struct upstream *up)
  902. {
  903. return up->name;
  904. }
  905. int rspamd_upstream_port(struct upstream *up)
  906. {
  907. struct upstream_addr_elt *elt;
  908. elt = g_ptr_array_index(up->addrs.addr, up->addrs.cur);
  909. return rspamd_inet_address_get_port(elt->addr);
  910. }
  911. gboolean
  912. rspamd_upstreams_add_upstream(struct upstream_list *ups, const char *str,
  913. uint16_t def_port, enum rspamd_upstream_parse_type parse_type,
  914. void *data)
  915. {
  916. struct upstream *upstream;
  917. GPtrArray *addrs = NULL;
  918. unsigned int i, slen;
  919. rspamd_inet_addr_t *addr;
  920. enum rspamd_parse_host_port_result ret = RSPAMD_PARSE_ADDR_FAIL;
  921. upstream = g_malloc0(sizeof(*upstream));
  922. slen = strlen(str);
  923. switch (parse_type) {
  924. case RSPAMD_UPSTREAM_PARSE_DEFAULT:
  925. if (slen > sizeof("service=") &&
  926. RSPAMD_LEN_CHECK_STARTS_WITH(str, slen, "service=")) {
  927. const char *plus_pos, *service_pos, *semicolon_pos;
  928. /* Accept service=srv_name+hostname[:priority] */
  929. service_pos = str + sizeof("service=") - 1;
  930. plus_pos = strchr(service_pos, '+');
  931. if (plus_pos != NULL) {
  932. semicolon_pos = strchr(plus_pos + 1, ':');
  933. if (semicolon_pos) {
  934. upstream->weight = strtoul(semicolon_pos + 1, NULL, 10);
  935. }
  936. else {
  937. semicolon_pos = plus_pos + strlen(plus_pos);
  938. }
  939. /*
  940. * Now our name is _service._tcp.<domain>
  941. * where <domain> is string between semicolon_pos and plus_pos +1
  942. * while service is a string between service_pos and plus_pos
  943. */
  944. unsigned int namelen = (semicolon_pos - (plus_pos + 1)) +
  945. (plus_pos - service_pos) +
  946. (sizeof("tcp") - 1) +
  947. 4;
  948. addrs = g_ptr_array_sized_new(1);
  949. upstream->name = ups->ctx ? rspamd_mempool_alloc(ups->ctx->pool, namelen + 1) : g_malloc(namelen + 1);
  950. rspamd_snprintf(upstream->name, namelen + 1,
  951. "_%*s._tcp.%*s",
  952. (int) (plus_pos - service_pos), service_pos,
  953. (int) (semicolon_pos - (plus_pos + 1)), plus_pos + 1);
  954. upstream->flags |= RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE;
  955. ret = RSPAMD_PARSE_ADDR_RESOLVED;
  956. }
  957. }
  958. else {
  959. ret = rspamd_parse_host_port_priority(str, &addrs,
  960. &upstream->weight,
  961. &upstream->name, def_port,
  962. FALSE,
  963. ups->ctx ? ups->ctx->pool : NULL);
  964. }
  965. break;
  966. case RSPAMD_UPSTREAM_PARSE_NAMESERVER:
  967. addrs = g_ptr_array_sized_new(1);
  968. if (rspamd_parse_inet_address(&addr, str, strlen(str),
  969. RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
  970. if (ups->ctx) {
  971. upstream->name = rspamd_mempool_strdup(ups->ctx->pool, str);
  972. }
  973. else {
  974. upstream->name = g_strdup(str);
  975. }
  976. if (rspamd_inet_address_get_port(addr) == 0) {
  977. rspamd_inet_address_set_port(addr, def_port);
  978. }
  979. g_ptr_array_add(addrs, addr);
  980. ret = RSPAMD_PARSE_ADDR_NUMERIC;
  981. if (ups->ctx) {
  982. rspamd_mempool_add_destructor(ups->ctx->pool,
  983. (rspamd_mempool_destruct_t) rspamd_inet_address_free,
  984. addr);
  985. rspamd_mempool_add_destructor(ups->ctx->pool,
  986. (rspamd_mempool_destruct_t) rspamd_ptr_array_free_hard,
  987. addrs);
  988. }
  989. }
  990. else {
  991. g_ptr_array_free(addrs, TRUE);
  992. }
  993. break;
  994. }
  995. if (ret == RSPAMD_PARSE_ADDR_FAIL) {
  996. g_free(upstream);
  997. return FALSE;
  998. }
  999. else {
  1000. upstream->flags |= ups->flags;
  1001. if (ret == RSPAMD_PARSE_ADDR_NUMERIC) {
  1002. /* Add noresolve flag */
  1003. upstream->flags |= RSPAMD_UPSTREAM_FLAG_NORESOLVE;
  1004. }
  1005. for (i = 0; i < addrs->len; i++) {
  1006. addr = g_ptr_array_index(addrs, i);
  1007. rspamd_upstream_add_addr(upstream, rspamd_inet_address_copy(addr, NULL));
  1008. }
  1009. }
  1010. if (upstream->weight == 0 && ups->rot_alg == RSPAMD_UPSTREAM_MASTER_SLAVE) {
  1011. /* Special heuristic for master-slave rotation */
  1012. if (ups->ups->len == 0) {
  1013. /* Prioritize the first */
  1014. upstream->weight = 1;
  1015. }
  1016. }
  1017. g_ptr_array_add(ups->ups, upstream);
  1018. upstream->ud = data;
  1019. upstream->cur_weight = upstream->weight;
  1020. upstream->ls = ups;
  1021. REF_INIT_RETAIN(upstream, rspamd_upstream_dtor);
  1022. #ifdef UPSTREAMS_THREAD_SAFE
  1023. upstream->lock = rspamd_mutex_new();
  1024. #endif
  1025. upstream->ctx = ups->ctx;
  1026. if (upstream->ctx) {
  1027. REF_RETAIN(ups->ctx);
  1028. g_queue_push_tail(ups->ctx->upstreams, upstream);
  1029. upstream->ctx_pos = g_queue_peek_tail_link(ups->ctx->upstreams);
  1030. }
  1031. unsigned int h = rspamd_cryptobox_fast_hash(upstream->name,
  1032. strlen(upstream->name), 0);
  1033. memset(upstream->uid, 0, sizeof(upstream->uid));
  1034. rspamd_encode_base32_buf((const unsigned char *) &h, sizeof(h),
  1035. upstream->uid, sizeof(upstream->uid) - 1, RSPAMD_BASE32_DEFAULT);
  1036. msg_debug_upstream("added upstream %s (%s)", upstream->name,
  1037. upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE ? "numeric ip" : "DNS name");
  1038. g_ptr_array_sort(upstream->addrs.addr, rspamd_upstream_addr_sort_func);
  1039. rspamd_upstream_set_active(ups, upstream);
  1040. return TRUE;
  1041. }
  1042. void rspamd_upstreams_set_flags(struct upstream_list *ups,
  1043. enum rspamd_upstream_flag flags)
  1044. {
  1045. ups->flags = flags;
  1046. }
  1047. void rspamd_upstreams_set_rotation(struct upstream_list *ups,
  1048. enum rspamd_upstream_rotation rot)
  1049. {
  1050. ups->rot_alg = rot;
  1051. }
  1052. gboolean
  1053. rspamd_upstream_add_addr(struct upstream *up, rspamd_inet_addr_t *addr)
  1054. {
  1055. struct upstream_addr_elt *elt;
  1056. /*
  1057. * XXX: slow and inefficient
  1058. */
  1059. if (up->addrs.addr == NULL) {
  1060. up->addrs.addr = g_ptr_array_new_full(8, rspamd_upstream_addr_elt_dtor);
  1061. }
  1062. elt = g_malloc0(sizeof(*elt));
  1063. elt->addr = addr;
  1064. g_ptr_array_add(up->addrs.addr, elt);
  1065. g_ptr_array_sort(up->addrs.addr, rspamd_upstream_addr_sort_func);
  1066. return TRUE;
  1067. }
  1068. gboolean
  1069. rspamd_upstreams_parse_line_len(struct upstream_list *ups,
  1070. const char *str, gsize len, uint16_t def_port, void *data)
  1071. {
  1072. const char *end = str + len, *p = str;
  1073. const char *separators = ";, \n\r\t";
  1074. char *tmp;
  1075. unsigned int span_len;
  1076. gboolean ret = FALSE;
  1077. if (RSPAMD_LEN_CHECK_STARTS_WITH(p, len, "random:")) {
  1078. ups->rot_alg = RSPAMD_UPSTREAM_RANDOM;
  1079. p += sizeof("random:") - 1;
  1080. }
  1081. else if (RSPAMD_LEN_CHECK_STARTS_WITH(p, len, "master-slave:")) {
  1082. ups->rot_alg = RSPAMD_UPSTREAM_MASTER_SLAVE;
  1083. p += sizeof("master-slave:") - 1;
  1084. }
  1085. else if (RSPAMD_LEN_CHECK_STARTS_WITH(p, len, "round-robin:")) {
  1086. ups->rot_alg = RSPAMD_UPSTREAM_ROUND_ROBIN;
  1087. p += sizeof("round-robin:") - 1;
  1088. }
  1089. else if (RSPAMD_LEN_CHECK_STARTS_WITH(p, len, "hash:")) {
  1090. ups->rot_alg = RSPAMD_UPSTREAM_HASHED;
  1091. p += sizeof("hash:") - 1;
  1092. }
  1093. while (p < end) {
  1094. span_len = rspamd_memcspn(p, separators, end - p);
  1095. if (span_len > 0) {
  1096. tmp = g_malloc(span_len + 1);
  1097. rspamd_strlcpy(tmp, p, span_len + 1);
  1098. if (rspamd_upstreams_add_upstream(ups, tmp, def_port,
  1099. RSPAMD_UPSTREAM_PARSE_DEFAULT,
  1100. data)) {
  1101. ret = TRUE;
  1102. }
  1103. g_free(tmp);
  1104. }
  1105. p += span_len;
  1106. /* Skip separators */
  1107. if (p < end) {
  1108. p += rspamd_memspn(p, separators, end - p);
  1109. }
  1110. }
  1111. if (!ups->ups_line) {
  1112. ups->ups_line = g_malloc(len + 1);
  1113. rspamd_strlcpy(ups->ups_line, str, len + 1);
  1114. }
  1115. return ret;
  1116. }
  1117. gboolean
  1118. rspamd_upstreams_parse_line(struct upstream_list *ups,
  1119. const char *str, uint16_t def_port, void *data)
  1120. {
  1121. return rspamd_upstreams_parse_line_len(ups, str, strlen(str),
  1122. def_port, data);
  1123. }
  1124. gboolean
  1125. rspamd_upstreams_from_ucl(struct upstream_list *ups,
  1126. const ucl_object_t *in, uint16_t def_port, void *data)
  1127. {
  1128. gboolean ret = FALSE;
  1129. const ucl_object_t *cur;
  1130. ucl_object_iter_t it = NULL;
  1131. it = ucl_object_iterate_new(in);
  1132. while ((cur = ucl_object_iterate_safe(it, true)) != NULL) {
  1133. if (ucl_object_type(cur) == UCL_STRING) {
  1134. ret = rspamd_upstreams_parse_line(ups, ucl_object_tostring(cur),
  1135. def_port, data);
  1136. }
  1137. }
  1138. ucl_object_iterate_free(it);
  1139. return ret;
  1140. }
  1141. void rspamd_upstreams_destroy(struct upstream_list *ups)
  1142. {
  1143. unsigned int i;
  1144. struct upstream *up;
  1145. struct upstream_list_watcher *w, *tmp;
  1146. if (ups != NULL) {
  1147. g_ptr_array_free(ups->alive, TRUE);
  1148. for (i = 0; i < ups->ups->len; i++) {
  1149. up = g_ptr_array_index(ups->ups, i);
  1150. up->ls = NULL;
  1151. REF_RELEASE(up);
  1152. }
  1153. DL_FOREACH_SAFE(ups->watchers, w, tmp)
  1154. {
  1155. if (w->dtor) {
  1156. w->dtor(w->ud);
  1157. }
  1158. g_free(w);
  1159. }
  1160. g_free(ups->ups_line);
  1161. g_ptr_array_free(ups->ups, TRUE);
  1162. #ifdef UPSTREAMS_THREAD_SAFE
  1163. rspamd_mutex_free(ups->lock);
  1164. #endif
  1165. g_free(ups);
  1166. }
  1167. }
  1168. static void
  1169. rspamd_upstream_restore_cb(gpointer elt, gpointer ls)
  1170. {
  1171. struct upstream *up = (struct upstream *) elt;
  1172. struct upstream_list *ups = (struct upstream_list *) ls;
  1173. struct upstream_list_watcher *w;
  1174. /* Here the upstreams list is already locked */
  1175. RSPAMD_UPSTREAM_LOCK(up);
  1176. if (ev_can_stop(&up->ev)) {
  1177. ev_timer_stop(up->ctx->event_loop, &up->ev);
  1178. }
  1179. g_ptr_array_add(ups->alive, up);
  1180. up->active_idx = ups->alive->len - 1;
  1181. RSPAMD_UPSTREAM_UNLOCK(up);
  1182. DL_FOREACH(up->ls->watchers, w)
  1183. {
  1184. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) {
  1185. w->func(up, RSPAMD_UPSTREAM_WATCH_ONLINE, up->errors, w->ud);
  1186. }
  1187. }
  1188. /* For revive event */
  1189. g_assert(up->ref.refcount > 1);
  1190. REF_RELEASE(up);
  1191. }
  1192. static struct upstream *
  1193. rspamd_upstream_get_random(struct upstream_list *ups,
  1194. struct upstream *except)
  1195. {
  1196. for (;;) {
  1197. unsigned int idx = ottery_rand_range(ups->alive->len - 1);
  1198. struct upstream *up;
  1199. up = g_ptr_array_index(ups->alive, idx);
  1200. if (except && up == except) {
  1201. continue;
  1202. }
  1203. return up;
  1204. }
  1205. }
  1206. static struct upstream *
  1207. rspamd_upstream_get_round_robin(struct upstream_list *ups,
  1208. struct upstream *except,
  1209. gboolean use_cur)
  1210. {
  1211. unsigned int max_weight = 0, min_checked = G_MAXUINT;
  1212. struct upstream *up = NULL, *selected = NULL, *min_checked_sel = NULL;
  1213. unsigned int i;
  1214. /* Select upstream with the maximum cur_weight */
  1215. RSPAMD_UPSTREAM_LOCK(ups);
  1216. for (i = 0; i < ups->alive->len; i++) {
  1217. up = g_ptr_array_index(ups->alive, i);
  1218. if (except != NULL && up == except) {
  1219. continue;
  1220. }
  1221. if (use_cur) {
  1222. if (up->cur_weight > max_weight) {
  1223. selected = up;
  1224. max_weight = up->cur_weight;
  1225. }
  1226. }
  1227. else {
  1228. if (up->weight > max_weight) {
  1229. selected = up;
  1230. max_weight = up->weight;
  1231. }
  1232. }
  1233. /*
  1234. * This code is used when all upstreams have zero weight
  1235. * The logic is to select least currently used upstream and penalise
  1236. * upstream with errors. The error penalty should no be too high
  1237. * to avoid sudden traffic drop in this case.
  1238. */
  1239. if (up->checked + up->errors * 2 < min_checked) {
  1240. min_checked_sel = up;
  1241. min_checked = up->checked;
  1242. }
  1243. }
  1244. if (max_weight == 0) {
  1245. /* All upstreams have zero weight */
  1246. if (min_checked > G_MAXUINT / 2) {
  1247. /* Reset all checked counters to avoid overflow */
  1248. for (i = 0; i < ups->alive->len; i++) {
  1249. up = g_ptr_array_index(ups->alive, i);
  1250. up->checked = 0;
  1251. }
  1252. }
  1253. selected = min_checked_sel;
  1254. }
  1255. if (use_cur && selected) {
  1256. if (selected->cur_weight > 0) {
  1257. selected->cur_weight--;
  1258. }
  1259. else {
  1260. selected->cur_weight = selected->weight;
  1261. }
  1262. }
  1263. RSPAMD_UPSTREAM_UNLOCK(ups);
  1264. return selected;
  1265. }
  1266. /*
  1267. * The key idea of this function is obtained from the following paper:
  1268. * A Fast, Minimal Memory, Consistent Hash Algorithm
  1269. * John Lamping, Eric Veach
  1270. *
  1271. * http://arxiv.org/abs/1406.2294
  1272. */
  1273. static uint32_t
  1274. rspamd_consistent_hash(uint64_t key, uint32_t nbuckets)
  1275. {
  1276. int64_t b = -1, j = 0;
  1277. while (j < nbuckets) {
  1278. b = j;
  1279. key *= 2862933555777941757ULL + 1;
  1280. j = (b + 1) * (double) (1ULL << 31) / (double) ((key >> 33) + 1ULL);
  1281. }
  1282. return b;
  1283. }
  1284. static struct upstream *
  1285. rspamd_upstream_get_hashed(struct upstream_list *ups,
  1286. struct upstream *except,
  1287. const uint8_t *key, unsigned int keylen)
  1288. {
  1289. uint64_t k;
  1290. uint32_t idx;
  1291. static const unsigned int max_tries = 20;
  1292. struct upstream *up = NULL;
  1293. /* Generate 64 bits input key */
  1294. k = rspamd_cryptobox_fast_hash_specific(RSPAMD_CRYPTOBOX_XXHASH64,
  1295. key, keylen, ups->hash_seed);
  1296. RSPAMD_UPSTREAM_LOCK(ups);
  1297. /*
  1298. * Select new upstream from all upstreams
  1299. */
  1300. for (unsigned int i = 0; i < max_tries; i++) {
  1301. idx = rspamd_consistent_hash(k, ups->ups->len);
  1302. up = g_ptr_array_index(ups->ups, idx);
  1303. if (up->active_idx < 0 || (except != NULL && up == except)) {
  1304. /* Found inactive or excluded upstream */
  1305. k = mum_hash_step(k, ups->hash_seed);
  1306. }
  1307. else {
  1308. break;
  1309. }
  1310. }
  1311. RSPAMD_UPSTREAM_UNLOCK(ups);
  1312. if (up->active_idx >= 0) {
  1313. return up;
  1314. }
  1315. /* We failed to find any active upstream */
  1316. up = rspamd_upstream_get_random(ups, except);
  1317. msg_info("failed to find hashed upstream for %s, fallback to random: %s",
  1318. ups->ups_line, up->name);
  1319. return up;
  1320. }
  1321. static struct upstream *
  1322. rspamd_upstream_get_common(struct upstream_list *ups,
  1323. struct upstream *except,
  1324. enum rspamd_upstream_rotation default_type,
  1325. const unsigned char *key, gsize keylen,
  1326. gboolean forced)
  1327. {
  1328. enum rspamd_upstream_rotation type;
  1329. struct upstream *up = NULL;
  1330. RSPAMD_UPSTREAM_LOCK(ups);
  1331. if (ups->alive->len == 0) {
  1332. /* We have no upstreams alive */
  1333. msg_warn("there are no alive upstreams left for %s, revive all of them",
  1334. ups->ups_line);
  1335. g_ptr_array_foreach(ups->ups, rspamd_upstream_restore_cb, ups);
  1336. }
  1337. RSPAMD_UPSTREAM_UNLOCK(ups);
  1338. if (ups->alive->len == 1 && default_type != RSPAMD_UPSTREAM_SEQUENTIAL) {
  1339. /* Fast path */
  1340. up = g_ptr_array_index(ups->alive, 0);
  1341. goto end;
  1342. }
  1343. if (!forced) {
  1344. type = ups->rot_alg != RSPAMD_UPSTREAM_UNDEF ? ups->rot_alg : default_type;
  1345. }
  1346. else {
  1347. type = default_type != RSPAMD_UPSTREAM_UNDEF ? default_type : ups->rot_alg;
  1348. }
  1349. if (type == RSPAMD_UPSTREAM_HASHED && (keylen == 0 || key == NULL)) {
  1350. /* Cannot use hashed rotation when no key is specified, switch to random */
  1351. type = RSPAMD_UPSTREAM_RANDOM;
  1352. }
  1353. switch (type) {
  1354. default:
  1355. case RSPAMD_UPSTREAM_RANDOM:
  1356. up = rspamd_upstream_get_random(ups, except);
  1357. break;
  1358. case RSPAMD_UPSTREAM_HASHED:
  1359. up = rspamd_upstream_get_hashed(ups, except, key, keylen);
  1360. break;
  1361. case RSPAMD_UPSTREAM_ROUND_ROBIN:
  1362. up = rspamd_upstream_get_round_robin(ups, except, TRUE);
  1363. break;
  1364. case RSPAMD_UPSTREAM_MASTER_SLAVE:
  1365. up = rspamd_upstream_get_round_robin(ups, except, FALSE);
  1366. break;
  1367. case RSPAMD_UPSTREAM_SEQUENTIAL:
  1368. if (ups->cur_elt >= ups->alive->len) {
  1369. ups->cur_elt = 0;
  1370. return NULL;
  1371. }
  1372. up = g_ptr_array_index(ups->alive, ups->cur_elt++);
  1373. break;
  1374. }
  1375. end:
  1376. if (up) {
  1377. up->checked++;
  1378. }
  1379. return up;
  1380. }
  1381. struct upstream *
  1382. rspamd_upstream_get(struct upstream_list *ups,
  1383. enum rspamd_upstream_rotation default_type,
  1384. const unsigned char *key, gsize keylen)
  1385. {
  1386. return rspamd_upstream_get_common(ups, NULL, default_type, key, keylen, FALSE);
  1387. }
  1388. struct upstream *
  1389. rspamd_upstream_get_forced(struct upstream_list *ups,
  1390. enum rspamd_upstream_rotation forced_type,
  1391. const unsigned char *key, gsize keylen)
  1392. {
  1393. return rspamd_upstream_get_common(ups, NULL, forced_type, key, keylen, TRUE);
  1394. }
  1395. struct upstream *rspamd_upstream_get_except(struct upstream_list *ups,
  1396. struct upstream *except,
  1397. enum rspamd_upstream_rotation default_type,
  1398. const unsigned char *key, gsize keylen)
  1399. {
  1400. return rspamd_upstream_get_common(ups, except, default_type, key, keylen, FALSE);
  1401. }
  1402. void rspamd_upstream_reresolve(struct upstream_ctx *ctx)
  1403. {
  1404. GList *cur;
  1405. struct upstream *up;
  1406. cur = ctx->upstreams->head;
  1407. while (cur) {
  1408. up = cur->data;
  1409. g_assert(up != NULL);
  1410. REF_RETAIN(up);
  1411. rspamd_upstream_resolve_addrs(up->ls, up);
  1412. REF_RELEASE(up);
  1413. cur = g_list_next(cur);
  1414. }
  1415. }
  1416. gpointer
  1417. rspamd_upstream_set_data(struct upstream *up, gpointer data)
  1418. {
  1419. gpointer prev_data = up->data;
  1420. up->data = data;
  1421. return prev_data;
  1422. }
  1423. gpointer
  1424. rspamd_upstream_get_data(struct upstream *up)
  1425. {
  1426. return up->data;
  1427. }
  1428. void rspamd_upstreams_foreach(struct upstream_list *ups,
  1429. rspamd_upstream_traverse_func cb, void *ud)
  1430. {
  1431. struct upstream *up;
  1432. unsigned int i;
  1433. for (i = 0; i < ups->ups->len; i++) {
  1434. up = g_ptr_array_index(ups->ups, i);
  1435. cb(up, i, ud);
  1436. }
  1437. }
  1438. void rspamd_upstreams_set_limits(struct upstream_list *ups,
  1439. double revive_time,
  1440. double revive_jitter,
  1441. double error_time,
  1442. double dns_timeout,
  1443. unsigned int max_errors,
  1444. unsigned int dns_retransmits)
  1445. {
  1446. struct upstream_limits *nlimits;
  1447. g_assert(ups != NULL);
  1448. nlimits = rspamd_mempool_alloc(ups->ctx->pool, sizeof(*nlimits));
  1449. memcpy(nlimits, ups->limits, sizeof(*nlimits));
  1450. if (!isnan(revive_time)) {
  1451. nlimits->revive_time = revive_time;
  1452. }
  1453. if (!isnan(revive_jitter)) {
  1454. nlimits->revive_jitter = revive_jitter;
  1455. }
  1456. if (!isnan(error_time)) {
  1457. nlimits->error_time = error_time;
  1458. }
  1459. if (!isnan(dns_timeout)) {
  1460. nlimits->dns_timeout = dns_timeout;
  1461. }
  1462. if (max_errors > 0) {
  1463. nlimits->max_errors = max_errors;
  1464. }
  1465. if (dns_retransmits > 0) {
  1466. nlimits->dns_retransmits = dns_retransmits;
  1467. }
  1468. ups->limits = nlimits;
  1469. }
  1470. void rspamd_upstreams_add_watch_callback(struct upstream_list *ups,
  1471. enum rspamd_upstreams_watch_event events,
  1472. rspamd_upstream_watch_func func,
  1473. GFreeFunc dtor,
  1474. gpointer ud)
  1475. {
  1476. struct upstream_list_watcher *nw;
  1477. g_assert((events & RSPAMD_UPSTREAM_WATCH_ALL) != 0);
  1478. nw = g_malloc(sizeof(*nw));
  1479. nw->func = func;
  1480. nw->events_mask = events;
  1481. nw->ud = ud;
  1482. nw->dtor = dtor;
  1483. DL_APPEND(ups->watchers, nw);
  1484. }
  1485. struct upstream *
  1486. rspamd_upstream_ref(struct upstream *up)
  1487. {
  1488. REF_RETAIN(up);
  1489. return up;
  1490. }
  1491. void rspamd_upstream_unref(struct upstream *up)
  1492. {
  1493. REF_RELEASE(up);
  1494. }