You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

upstream.c 44KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761
  1. /*
  2. * Copyright 2024 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "upstream.h"
  18. #include "ottery.h"
  19. #include "ref.h"
  20. #include "cfg_file.h"
  21. #include "rdns.h"
  22. #include "cryptobox.h"
  23. #include "utlist.h"
  24. #include "contrib/libev/ev.h"
  25. #include "logger.h"
  26. #include "contrib/librdns/rdns.h"
  27. #include "contrib/mumhash/mum.h"
  28. #include <math.h>
  29. struct upstream_inet_addr_entry {
  30. rspamd_inet_addr_t *addr;
  31. unsigned int priority;
  32. struct upstream_inet_addr_entry *next;
  33. };
  34. struct upstream_addr_elt {
  35. rspamd_inet_addr_t *addr;
  36. unsigned int priority;
  37. unsigned int errors;
  38. };
  39. struct upstream_list_watcher {
  40. rspamd_upstream_watch_func func;
  41. GFreeFunc dtor;
  42. gpointer ud;
  43. enum rspamd_upstreams_watch_event events_mask;
  44. struct upstream_list_watcher *next, *prev;
  45. };
  46. struct upstream {
  47. unsigned int weight;
  48. unsigned int cur_weight;
  49. unsigned int errors;
  50. unsigned int checked;
  51. unsigned int dns_requests;
  52. int active_idx;
  53. unsigned int ttl;
  54. char *name;
  55. ev_timer ev;
  56. double last_fail;
  57. double last_resolve;
  58. gpointer ud;
  59. enum rspamd_upstream_flag flags;
  60. struct upstream_list *ls;
  61. GList *ctx_pos;
  62. struct upstream_ctx *ctx;
  63. struct {
  64. GPtrArray *addr; /* struct upstream_addr_elt */
  65. unsigned int cur;
  66. } addrs;
  67. struct upstream_inet_addr_entry *new_addrs;
  68. gpointer data;
  69. char uid[8];
  70. ref_entry_t ref;
  71. #ifdef UPSTREAMS_THREAD_SAFE
  72. rspamd_mutex_t *lock;
  73. #endif
  74. };
  75. struct upstream_limits {
  76. double revive_time;
  77. double revive_jitter;
  78. double error_time;
  79. double dns_timeout;
  80. double lazy_resolve_time;
  81. unsigned int max_errors;
  82. unsigned int dns_retransmits;
  83. };
  84. struct upstream_list {
  85. char *ups_line;
  86. struct upstream_ctx *ctx;
  87. GPtrArray *ups;
  88. GPtrArray *alive;
  89. struct upstream_list_watcher *watchers;
  90. uint64_t hash_seed;
  91. const struct upstream_limits *limits;
  92. enum rspamd_upstream_flag flags;
  93. unsigned int cur_elt;
  94. enum rspamd_upstream_rotation rot_alg;
  95. #ifdef UPSTREAMS_THREAD_SAFE
  96. rspamd_mutex_t *lock;
  97. #endif
  98. };
  99. struct upstream_ctx {
  100. struct rdns_resolver *res;
  101. struct ev_loop *event_loop;
  102. struct upstream_limits limits;
  103. GQueue *upstreams;
  104. gboolean configured;
  105. rspamd_mempool_t *pool;
  106. ref_entry_t ref;
  107. };
  108. #ifndef UPSTREAMS_THREAD_SAFE
  109. #define RSPAMD_UPSTREAM_LOCK(x) \
  110. do { \
  111. } while (0)
  112. #define RSPAMD_UPSTREAM_UNLOCK(x) \
  113. do { \
  114. } while (0)
  115. #else
  116. #define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x->lock)
  117. #define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x->lock)
  118. #endif
  119. #define msg_debug_upstream(...) rspamd_conditional_debug_fast(NULL, NULL, \
  120. rspamd_upstream_log_id, "upstream", upstream->uid, \
  121. G_STRFUNC, \
  122. __VA_ARGS__)
  123. #define msg_info_upstream(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \
  124. "upstream", upstream->uid, \
  125. G_STRFUNC, \
  126. __VA_ARGS__)
  127. #define msg_err_upstream(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \
  128. "upstream", upstream->uid, \
  129. G_STRFUNC, \
  130. __VA_ARGS__)
  131. INIT_LOG_MODULE(upstream)
  132. /* 4 errors in 10 seconds */
  133. #define DEFAULT_MAX_ERRORS 4
  134. static const unsigned int default_max_errors = DEFAULT_MAX_ERRORS;
  135. #define DEFAULT_REVIVE_TIME 60
  136. static const double default_revive_time = DEFAULT_REVIVE_TIME;
  137. #define DEFAULT_REVIVE_JITTER 0.4
  138. static const double default_revive_jitter = DEFAULT_REVIVE_JITTER;
  139. #define DEFAULT_ERROR_TIME 10
  140. static const double default_error_time = DEFAULT_ERROR_TIME;
  141. #define DEFAULT_DNS_TIMEOUT 1.0
  142. static const double default_dns_timeout = DEFAULT_DNS_TIMEOUT;
  143. #define DEFAULT_DNS_RETRANSMITS 2
  144. static const unsigned int default_dns_retransmits = DEFAULT_DNS_RETRANSMITS;
  145. /* TODO: make it configurable */
  146. #define DEFAULT_LAZY_RESOLVE_TIME 3600.0
  147. static const double default_lazy_resolve_time = DEFAULT_LAZY_RESOLVE_TIME;
  148. static const struct upstream_limits default_limits = {
  149. .revive_time = DEFAULT_REVIVE_TIME,
  150. .revive_jitter = DEFAULT_REVIVE_JITTER,
  151. .error_time = DEFAULT_ERROR_TIME,
  152. .dns_timeout = DEFAULT_DNS_TIMEOUT,
  153. .dns_retransmits = DEFAULT_DNS_RETRANSMITS,
  154. .max_errors = DEFAULT_MAX_ERRORS,
  155. .lazy_resolve_time = DEFAULT_LAZY_RESOLVE_TIME,
  156. };
  157. static void rspamd_upstream_lazy_resolve_cb(struct ev_loop *, ev_timer *, int);
  158. void rspamd_upstreams_library_config(struct rspamd_config *cfg,
  159. struct upstream_ctx *ctx,
  160. struct ev_loop *event_loop,
  161. struct rdns_resolver *resolver)
  162. {
  163. g_assert(ctx != NULL);
  164. g_assert(cfg != NULL);
  165. if (cfg->upstream_error_time) {
  166. ctx->limits.error_time = cfg->upstream_error_time;
  167. }
  168. if (cfg->upstream_max_errors) {
  169. ctx->limits.max_errors = cfg->upstream_max_errors;
  170. }
  171. if (cfg->upstream_revive_time) {
  172. ctx->limits.revive_time = cfg->upstream_revive_time;
  173. }
  174. if (cfg->upstream_lazy_resolve_time) {
  175. ctx->limits.lazy_resolve_time = cfg->upstream_lazy_resolve_time;
  176. }
  177. if (cfg->dns_retransmits) {
  178. ctx->limits.dns_retransmits = cfg->dns_retransmits;
  179. }
  180. if (cfg->dns_timeout) {
  181. ctx->limits.dns_timeout = cfg->dns_timeout;
  182. }
  183. ctx->event_loop = event_loop;
  184. ctx->res = resolver;
  185. ctx->configured = TRUE;
  186. /* Start lazy resolving */
  187. if (event_loop && resolver) {
  188. GList *cur;
  189. struct upstream *upstream;
  190. cur = ctx->upstreams->head;
  191. while (cur) {
  192. upstream = cur->data;
  193. if (!ev_can_stop(&upstream->ev) && upstream->ls &&
  194. !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
  195. double when;
  196. if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
  197. /* Resolve them immediately ! */
  198. when = 0.0;
  199. }
  200. else {
  201. when = rspamd_time_jitter(upstream->ls->limits->lazy_resolve_time,
  202. upstream->ls->limits->lazy_resolve_time * .1);
  203. }
  204. ev_timer_init(&upstream->ev, rspamd_upstream_lazy_resolve_cb,
  205. when, 0);
  206. upstream->ev.data = upstream;
  207. ev_timer_start(ctx->event_loop, &upstream->ev);
  208. }
  209. cur = g_list_next(cur);
  210. }
  211. }
  212. }
  213. static void
  214. rspamd_upstream_ctx_dtor(struct upstream_ctx *ctx)
  215. {
  216. GList *cur;
  217. struct upstream *u;
  218. cur = ctx->upstreams->head;
  219. while (cur) {
  220. u = cur->data;
  221. u->ctx = NULL;
  222. u->ctx_pos = NULL;
  223. cur = g_list_next(cur);
  224. }
  225. g_queue_free(ctx->upstreams);
  226. rspamd_mempool_delete(ctx->pool);
  227. g_free(ctx);
  228. }
  229. void rspamd_upstreams_library_unref(struct upstream_ctx *ctx)
  230. {
  231. REF_RELEASE(ctx);
  232. }
  233. struct upstream_ctx *
  234. rspamd_upstreams_library_init(void)
  235. {
  236. struct upstream_ctx *ctx;
  237. ctx = g_malloc0(sizeof(*ctx));
  238. memcpy(&ctx->limits, &default_limits, sizeof(ctx->limits));
  239. ctx->pool = rspamd_mempool_new(rspamd_mempool_suggest_size(),
  240. "upstreams", 0);
  241. ctx->upstreams = g_queue_new();
  242. REF_INIT_RETAIN(ctx, rspamd_upstream_ctx_dtor);
  243. return ctx;
  244. }
  245. static int
  246. rspamd_upstream_af_to_weight(const rspamd_inet_addr_t *addr)
  247. {
  248. int ret;
  249. switch (rspamd_inet_address_get_af(addr)) {
  250. case AF_UNIX:
  251. ret = 2;
  252. break;
  253. case AF_INET:
  254. ret = 1;
  255. break;
  256. default:
  257. ret = 0;
  258. break;
  259. }
  260. return ret;
  261. }
  262. /*
  263. * Select IPv4 addresses before IPv6
  264. */
  265. static int
  266. rspamd_upstream_addr_sort_func(gconstpointer a, gconstpointer b)
  267. {
  268. const struct upstream_addr_elt *ip1 = *(const struct upstream_addr_elt **) a,
  269. *ip2 = *(const struct upstream_addr_elt **) b;
  270. int w1, w2;
  271. if (ip1->priority == 0 && ip2->priority == 0) {
  272. w1 = rspamd_upstream_af_to_weight(ip1->addr);
  273. w2 = rspamd_upstream_af_to_weight(ip2->addr);
  274. }
  275. else {
  276. w1 = ip1->priority;
  277. w2 = ip2->priority;
  278. }
  279. /* Inverse order */
  280. return w2 - w1;
  281. }
  282. static void
  283. rspamd_upstream_set_active(struct upstream_list *ls, struct upstream *upstream)
  284. {
  285. RSPAMD_UPSTREAM_LOCK(ls);
  286. g_ptr_array_add(ls->alive, upstream);
  287. upstream->active_idx = ls->alive->len - 1;
  288. if (upstream->ctx && upstream->ctx->configured &&
  289. !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
  290. if (ev_can_stop(&upstream->ev)) {
  291. ev_timer_stop(upstream->ctx->event_loop, &upstream->ev);
  292. }
  293. /* Start lazy (or not so lazy) names resolution */
  294. double when;
  295. if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
  296. /* Resolve them immediately ! */
  297. when = 0.0;
  298. }
  299. else {
  300. when = rspamd_time_jitter(upstream->ls->limits->lazy_resolve_time,
  301. upstream->ls->limits->lazy_resolve_time * .1);
  302. }
  303. ev_timer_init(&upstream->ev, rspamd_upstream_lazy_resolve_cb,
  304. when, 0);
  305. upstream->ev.data = upstream;
  306. msg_debug_upstream("start lazy resolving for %s in %.0f seconds",
  307. upstream->name, when);
  308. ev_timer_start(upstream->ctx->event_loop, &upstream->ev);
  309. }
  310. RSPAMD_UPSTREAM_UNLOCK(ls);
  311. }
  312. static void
  313. rspamd_upstream_addr_elt_dtor(gpointer a)
  314. {
  315. struct upstream_addr_elt *elt = a;
  316. if (elt) {
  317. rspamd_inet_address_free(elt->addr);
  318. g_free(elt);
  319. }
  320. }
  321. static void
  322. rspamd_upstream_update_addrs(struct upstream *upstream)
  323. {
  324. unsigned int addr_cnt, i, port;
  325. gboolean seen_addr, reset_errors = FALSE;
  326. struct upstream_inet_addr_entry *cur, *tmp;
  327. GPtrArray *new_addrs;
  328. struct upstream_addr_elt *addr_elt, *naddr;
  329. /*
  330. * We need first of all get the saved port, since DNS gives us no
  331. * idea about what port has been used previously
  332. */
  333. RSPAMD_UPSTREAM_LOCK(upstream);
  334. if (upstream->addrs.addr->len > 0 && upstream->new_addrs) {
  335. addr_elt = g_ptr_array_index(upstream->addrs.addr, 0);
  336. port = rspamd_inet_address_get_port(addr_elt->addr);
  337. /* Now calculate new addrs count */
  338. addr_cnt = 0;
  339. LL_FOREACH(upstream->new_addrs, cur)
  340. {
  341. addr_cnt++;
  342. }
  343. /* At 10% probability reset errors on addr elements */
  344. if (rspamd_random_double_fast() > 0.9) {
  345. reset_errors = TRUE;
  346. msg_debug_upstream("reset errors on upstream %s",
  347. upstream->name);
  348. }
  349. new_addrs = g_ptr_array_new_full(addr_cnt, rspamd_upstream_addr_elt_dtor);
  350. /* Copy addrs back */
  351. LL_FOREACH(upstream->new_addrs, cur)
  352. {
  353. seen_addr = FALSE;
  354. naddr = NULL;
  355. /* Ports are problematic, set to compare in the next block */
  356. rspamd_inet_address_set_port(cur->addr, port);
  357. PTR_ARRAY_FOREACH(upstream->addrs.addr, i, addr_elt)
  358. {
  359. if (rspamd_inet_address_compare(addr_elt->addr, cur->addr, FALSE) == 0) {
  360. naddr = g_malloc0(sizeof(*naddr));
  361. naddr->addr = cur->addr;
  362. naddr->errors = reset_errors ? 0 : addr_elt->errors;
  363. seen_addr = TRUE;
  364. break;
  365. }
  366. }
  367. if (!seen_addr) {
  368. naddr = g_malloc0(sizeof(*naddr));
  369. naddr->addr = cur->addr;
  370. naddr->errors = 0;
  371. msg_debug_upstream("new address for %s: %s",
  372. upstream->name,
  373. rspamd_inet_address_to_string_pretty(naddr->addr));
  374. }
  375. else {
  376. msg_debug_upstream("existing address for %s: %s",
  377. upstream->name,
  378. rspamd_inet_address_to_string_pretty(cur->addr));
  379. }
  380. g_ptr_array_add(new_addrs, naddr);
  381. }
  382. /* Free old addresses */
  383. g_ptr_array_free(upstream->addrs.addr, TRUE);
  384. upstream->addrs.cur = 0;
  385. upstream->addrs.addr = new_addrs;
  386. g_ptr_array_sort(upstream->addrs.addr, rspamd_upstream_addr_sort_func);
  387. }
  388. LL_FOREACH_SAFE(upstream->new_addrs, cur, tmp)
  389. {
  390. /* Do not free inet address pointer since it has been transferred to up */
  391. g_free(cur);
  392. }
  393. upstream->new_addrs = NULL;
  394. RSPAMD_UPSTREAM_UNLOCK(upstream);
  395. }
  396. static void
  397. rspamd_upstream_dns_cb(struct rdns_reply *reply, void *arg)
  398. {
  399. struct upstream *up = (struct upstream *) arg;
  400. struct rdns_reply_entry *entry;
  401. struct upstream_inet_addr_entry *up_ent;
  402. if (reply->code == RDNS_RC_NOERROR) {
  403. entry = reply->entries;
  404. RSPAMD_UPSTREAM_LOCK(up);
  405. while (entry) {
  406. if (entry->type == RDNS_REQUEST_A) {
  407. up_ent = g_malloc0(sizeof(*up_ent));
  408. up_ent->addr = rspamd_inet_address_new(AF_INET,
  409. &entry->content.a.addr);
  410. LL_PREPEND(up->new_addrs, up_ent);
  411. }
  412. else if (entry->type == RDNS_REQUEST_AAAA) {
  413. up_ent = g_malloc0(sizeof(*up_ent));
  414. up_ent->addr = rspamd_inet_address_new(AF_INET6,
  415. &entry->content.aaa.addr);
  416. LL_PREPEND(up->new_addrs, up_ent);
  417. }
  418. entry = entry->next;
  419. }
  420. RSPAMD_UPSTREAM_UNLOCK(up);
  421. }
  422. up->dns_requests--;
  423. if (up->dns_requests == 0) {
  424. rspamd_upstream_update_addrs(up);
  425. }
  426. REF_RELEASE(up);
  427. }
  428. struct rspamd_upstream_srv_dns_cb {
  429. struct upstream *up;
  430. unsigned int priority;
  431. unsigned int port;
  432. unsigned int requests_inflight;
  433. };
  434. /* Used when we have resolved SRV record and resolved addrs */
  435. static void
  436. rspamd_upstream_dns_srv_phase2_cb(struct rdns_reply *reply, void *arg)
  437. {
  438. struct rspamd_upstream_srv_dns_cb *cbdata =
  439. (struct rspamd_upstream_srv_dns_cb *) arg;
  440. struct upstream *up;
  441. struct rdns_reply_entry *entry;
  442. struct upstream_inet_addr_entry *up_ent;
  443. up = cbdata->up;
  444. if (reply->code == RDNS_RC_NOERROR) {
  445. entry = reply->entries;
  446. RSPAMD_UPSTREAM_LOCK(up);
  447. while (entry) {
  448. if (entry->type == RDNS_REQUEST_A) {
  449. up_ent = g_malloc0(sizeof(*up_ent));
  450. up_ent->addr = rspamd_inet_address_new(AF_INET,
  451. &entry->content.a.addr);
  452. up_ent->priority = cbdata->priority;
  453. rspamd_inet_address_set_port(up_ent->addr, cbdata->port);
  454. LL_PREPEND(up->new_addrs, up_ent);
  455. }
  456. else if (entry->type == RDNS_REQUEST_AAAA) {
  457. up_ent = g_malloc0(sizeof(*up_ent));
  458. up_ent->addr = rspamd_inet_address_new(AF_INET6,
  459. &entry->content.aaa.addr);
  460. up_ent->priority = cbdata->priority;
  461. rspamd_inet_address_set_port(up_ent->addr, cbdata->port);
  462. LL_PREPEND(up->new_addrs, up_ent);
  463. }
  464. entry = entry->next;
  465. }
  466. RSPAMD_UPSTREAM_UNLOCK(up);
  467. }
  468. up->dns_requests--;
  469. cbdata->requests_inflight--;
  470. if (cbdata->requests_inflight == 0) {
  471. g_free(cbdata);
  472. }
  473. if (up->dns_requests == 0) {
  474. rspamd_upstream_update_addrs(up);
  475. }
  476. REF_RELEASE(up);
  477. }
  478. static void
  479. rspamd_upstream_dns_srv_cb(struct rdns_reply *reply, void *arg)
  480. {
  481. struct upstream *upstream = (struct upstream *) arg;
  482. struct rdns_reply_entry *entry;
  483. struct rspamd_upstream_srv_dns_cb *ncbdata;
  484. if (reply->code == RDNS_RC_NOERROR) {
  485. entry = reply->entries;
  486. RSPAMD_UPSTREAM_LOCK(upstream);
  487. while (entry) {
  488. /* XXX: we ignore weight as it contradicts with upstreams logic */
  489. if (entry->type == RDNS_REQUEST_SRV) {
  490. msg_debug_upstream("got srv reply for %s: %s "
  491. "(weight=%d, priority=%d, port=%d)",
  492. upstream->name, entry->content.srv.target,
  493. entry->content.srv.weight, entry->content.srv.priority,
  494. entry->content.srv.port);
  495. ncbdata = g_malloc0(sizeof(*ncbdata));
  496. ncbdata->priority = entry->content.srv.weight;
  497. ncbdata->port = entry->content.srv.port;
  498. /* XXX: for all entries? */
  499. upstream->ttl = entry->ttl;
  500. if (rdns_make_request_full(upstream->ctx->res,
  501. rspamd_upstream_dns_srv_phase2_cb, ncbdata,
  502. upstream->ls->limits->dns_timeout,
  503. upstream->ls->limits->dns_retransmits,
  504. 1, entry->content.srv.target, RDNS_REQUEST_A) != NULL) {
  505. upstream->dns_requests++;
  506. REF_RETAIN(upstream);
  507. ncbdata->requests_inflight++;
  508. }
  509. if (rdns_make_request_full(upstream->ctx->res,
  510. rspamd_upstream_dns_srv_phase2_cb, ncbdata,
  511. upstream->ls->limits->dns_timeout,
  512. upstream->ls->limits->dns_retransmits,
  513. 1, entry->content.srv.target, RDNS_REQUEST_AAAA) != NULL) {
  514. upstream->dns_requests++;
  515. REF_RETAIN(upstream);
  516. ncbdata->requests_inflight++;
  517. }
  518. if (ncbdata->requests_inflight == 0) {
  519. g_free(ncbdata);
  520. }
  521. }
  522. entry = entry->next;
  523. }
  524. RSPAMD_UPSTREAM_UNLOCK(upstream);
  525. }
  526. upstream->dns_requests--;
  527. REF_RELEASE(upstream);
  528. }
  529. static void
  530. rspamd_upstream_revive_cb(struct ev_loop *loop, ev_timer *w, int revents)
  531. {
  532. struct upstream *upstream = (struct upstream *) w->data;
  533. RSPAMD_UPSTREAM_LOCK(upstream);
  534. ev_timer_stop(loop, w);
  535. msg_debug_upstream("revive upstream %s", upstream->name);
  536. if (upstream->ls) {
  537. rspamd_upstream_set_active(upstream->ls, upstream);
  538. }
  539. RSPAMD_UPSTREAM_UNLOCK(upstream);
  540. g_assert(upstream->ref.refcount > 1);
  541. REF_RELEASE(upstream);
  542. }
  543. static void
  544. rspamd_upstream_resolve_addrs(const struct upstream_list *ls,
  545. struct upstream *upstream)
  546. {
  547. /* XXX: maybe make it configurable */
  548. static const double min_resolve_interval = 60.0;
  549. if (upstream->ctx->res != NULL &&
  550. upstream->ctx->configured &&
  551. upstream->dns_requests == 0 &&
  552. !(upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
  553. double now = ev_now(upstream->ctx->event_loop);
  554. if (now - upstream->last_resolve < min_resolve_interval) {
  555. msg_info_upstream("do not resolve upstream %s as it was checked %.0f "
  556. "seconds ago (%.0f is minimum)",
  557. upstream->name, now - upstream->last_resolve,
  558. min_resolve_interval);
  559. return;
  560. }
  561. /* Resolve name of the upstream one more time */
  562. if (upstream->name[0] != '/') {
  563. upstream->last_resolve = now;
  564. /*
  565. * If upstream name has a port, then we definitely need to resolve
  566. * merely host part!
  567. */
  568. char dns_name[253 + 1]; /* 253 == max dns name + \0 */
  569. const char *semicolon_pos = strchr(upstream->name, ':');
  570. if (semicolon_pos != NULL && semicolon_pos > upstream->name) {
  571. if (sizeof(dns_name) > semicolon_pos - upstream->name) {
  572. rspamd_strlcpy(dns_name, upstream->name,
  573. semicolon_pos - upstream->name + 1);
  574. }
  575. else {
  576. /* XXX: truncated */
  577. msg_err_upstream("internal error: upstream name is larger than"
  578. "max DNS name: %s",
  579. upstream->name);
  580. rspamd_strlcpy(dns_name, upstream->name, sizeof(dns_name));
  581. }
  582. }
  583. else {
  584. rspamd_strlcpy(dns_name, upstream->name, sizeof(dns_name));
  585. }
  586. if (upstream->flags & RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE) {
  587. if (rdns_make_request_full(upstream->ctx->res,
  588. rspamd_upstream_dns_srv_cb, upstream,
  589. ls->limits->dns_timeout, ls->limits->dns_retransmits,
  590. 1, dns_name, RDNS_REQUEST_SRV) != NULL) {
  591. upstream->dns_requests++;
  592. REF_RETAIN(upstream);
  593. }
  594. }
  595. else {
  596. if (rdns_make_request_full(upstream->ctx->res,
  597. rspamd_upstream_dns_cb, upstream,
  598. ls->limits->dns_timeout, ls->limits->dns_retransmits,
  599. 1, dns_name, RDNS_REQUEST_A) != NULL) {
  600. upstream->dns_requests++;
  601. REF_RETAIN(upstream);
  602. }
  603. if (rdns_make_request_full(upstream->ctx->res,
  604. rspamd_upstream_dns_cb, upstream,
  605. ls->limits->dns_timeout, ls->limits->dns_retransmits,
  606. 1, dns_name, RDNS_REQUEST_AAAA) != NULL) {
  607. upstream->dns_requests++;
  608. REF_RETAIN(upstream);
  609. }
  610. }
  611. }
  612. }
  613. else if (upstream->dns_requests != 0) {
  614. msg_info_upstream("do not resolve upstream %s as another request for "
  615. "resolving has been already issued",
  616. upstream->name);
  617. }
  618. }
  619. static void
  620. rspamd_upstream_lazy_resolve_cb(struct ev_loop *loop, ev_timer *w, int revents)
  621. {
  622. struct upstream *up = (struct upstream *) w->data;
  623. RSPAMD_UPSTREAM_LOCK(up);
  624. ev_timer_stop(loop, w);
  625. if (up->ls) {
  626. rspamd_upstream_resolve_addrs(up->ls, up);
  627. if (up->ttl == 0 || up->ttl > up->ls->limits->lazy_resolve_time) {
  628. w->repeat = rspamd_time_jitter(up->ls->limits->lazy_resolve_time,
  629. up->ls->limits->lazy_resolve_time * .1);
  630. }
  631. else {
  632. w->repeat = up->ttl;
  633. }
  634. ev_timer_again(loop, w);
  635. }
  636. RSPAMD_UPSTREAM_UNLOCK(up);
  637. }
  638. static void
  639. rspamd_upstream_set_inactive(struct upstream_list *ls, struct upstream *upstream)
  640. {
  641. double ntim;
  642. unsigned int i;
  643. struct upstream *cur;
  644. struct upstream_list_watcher *w;
  645. RSPAMD_UPSTREAM_LOCK(ls);
  646. g_ptr_array_remove_index(ls->alive, upstream->active_idx);
  647. upstream->active_idx = -1;
  648. /* We need to update all indices */
  649. for (i = 0; i < ls->alive->len; i++) {
  650. cur = g_ptr_array_index(ls->alive, i);
  651. cur->active_idx = i;
  652. }
  653. if (upstream->ctx) {
  654. rspamd_upstream_resolve_addrs(ls, upstream);
  655. REF_RETAIN(upstream);
  656. ntim = rspamd_time_jitter(ls->limits->revive_time,
  657. ls->limits->revive_time * ls->limits->revive_jitter);
  658. if (ev_can_stop(&upstream->ev)) {
  659. ev_timer_stop(upstream->ctx->event_loop, &upstream->ev);
  660. }
  661. msg_debug_upstream("mark upstream %s inactive; revive in %.0f seconds",
  662. upstream->name, ntim);
  663. ev_timer_init(&upstream->ev, rspamd_upstream_revive_cb, ntim, 0);
  664. upstream->ev.data = upstream;
  665. if (upstream->ctx->event_loop != NULL && upstream->ctx->configured) {
  666. ev_timer_start(upstream->ctx->event_loop, &upstream->ev);
  667. }
  668. }
  669. DL_FOREACH(upstream->ls->watchers, w)
  670. {
  671. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) {
  672. w->func(upstream, RSPAMD_UPSTREAM_WATCH_OFFLINE, upstream->errors, w->ud);
  673. }
  674. }
  675. RSPAMD_UPSTREAM_UNLOCK(ls);
  676. }
  677. void rspamd_upstream_fail(struct upstream *upstream,
  678. gboolean addr_failure,
  679. const char *reason)
  680. {
  681. double error_rate = 0, max_error_rate = 0;
  682. double sec_last, sec_cur;
  683. struct upstream_addr_elt *addr_elt;
  684. struct upstream_list_watcher *w;
  685. msg_debug_upstream("upstream %s failed; reason: %s",
  686. upstream->name,
  687. reason);
  688. if (upstream->ctx && upstream->active_idx != -1 && upstream->ls) {
  689. sec_cur = rspamd_get_ticks(FALSE);
  690. RSPAMD_UPSTREAM_LOCK(upstream);
  691. if (upstream->errors == 0) {
  692. /* We have the first error */
  693. upstream->last_fail = sec_cur;
  694. upstream->errors = 1;
  695. if (upstream->ls && upstream->dns_requests == 0) {
  696. /* Try to re-resolve address immediately */
  697. rspamd_upstream_resolve_addrs(upstream->ls, upstream);
  698. }
  699. DL_FOREACH(upstream->ls->watchers, w)
  700. {
  701. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
  702. w->func(upstream, RSPAMD_UPSTREAM_WATCH_FAILURE, 1, w->ud);
  703. }
  704. }
  705. }
  706. else {
  707. sec_last = upstream->last_fail;
  708. if (sec_cur >= sec_last) {
  709. upstream->errors++;
  710. DL_FOREACH(upstream->ls->watchers, w)
  711. {
  712. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
  713. w->func(upstream, RSPAMD_UPSTREAM_WATCH_FAILURE,
  714. upstream->errors, w->ud);
  715. }
  716. }
  717. if (sec_cur - sec_last >= upstream->ls->limits->error_time) {
  718. error_rate = ((double) upstream->errors) / (sec_cur - sec_last);
  719. max_error_rate = ((double) upstream->ls->limits->max_errors) /
  720. upstream->ls->limits->error_time;
  721. }
  722. if (error_rate > max_error_rate) {
  723. /* Remove upstream from the active list */
  724. if (upstream->ls->ups->len > 1) {
  725. msg_debug_upstream("mark upstream %s inactive; "
  726. "reason: %s; %.2f "
  727. "error rate (%d errors), "
  728. "%.2f max error rate, "
  729. "%.1f first error time, "
  730. "%.1f current ts, "
  731. "%d upstreams left",
  732. upstream->name,
  733. reason,
  734. error_rate,
  735. upstream->errors,
  736. max_error_rate,
  737. sec_last,
  738. sec_cur,
  739. upstream->ls->alive->len - 1);
  740. rspamd_upstream_set_inactive(upstream->ls, upstream);
  741. upstream->errors = 0;
  742. }
  743. else {
  744. msg_debug_upstream("cannot mark last alive upstream %s "
  745. "inactive; reason: %s; %.2f "
  746. "error rate (%d errors), "
  747. "%.2f max error rate, "
  748. "%.1f first error time, "
  749. "%.1f current ts",
  750. upstream->name,
  751. reason,
  752. error_rate,
  753. upstream->errors,
  754. max_error_rate,
  755. sec_last,
  756. sec_cur);
  757. /* Just re-resolve addresses */
  758. if (sec_cur - sec_last > upstream->ls->limits->revive_time) {
  759. upstream->errors = 0;
  760. rspamd_upstream_resolve_addrs(upstream->ls, upstream);
  761. }
  762. }
  763. }
  764. else if (sec_cur - sec_last >= upstream->ls->limits->error_time) {
  765. /* Forget the whole interval */
  766. upstream->last_fail = sec_cur;
  767. upstream->errors = 1;
  768. }
  769. }
  770. }
  771. if (addr_failure) {
  772. /* Also increase count of errors for this specific address */
  773. if (upstream->addrs.addr) {
  774. addr_elt = g_ptr_array_index(upstream->addrs.addr,
  775. upstream->addrs.cur);
  776. addr_elt->errors++;
  777. }
  778. }
  779. RSPAMD_UPSTREAM_UNLOCK(upstream);
  780. }
  781. }
  782. void rspamd_upstream_ok(struct upstream *upstream)
  783. {
  784. struct upstream_addr_elt *addr_elt;
  785. struct upstream_list_watcher *w;
  786. RSPAMD_UPSTREAM_LOCK(upstream);
  787. if (upstream->errors > 0 && upstream->active_idx != -1 && upstream->ls) {
  788. /* We touch upstream if and only if it is active */
  789. msg_debug_upstream("reset errors on upstream %s (was %ud)", upstream->name, upstream->errors);
  790. upstream->errors = 0;
  791. if (upstream->addrs.addr) {
  792. addr_elt = g_ptr_array_index(upstream->addrs.addr, upstream->addrs.cur);
  793. addr_elt->errors = 0;
  794. }
  795. DL_FOREACH(upstream->ls->watchers, w)
  796. {
  797. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_SUCCESS) {
  798. w->func(upstream, RSPAMD_UPSTREAM_WATCH_SUCCESS, 0, w->ud);
  799. }
  800. }
  801. }
  802. RSPAMD_UPSTREAM_UNLOCK(upstream);
  803. }
  804. void rspamd_upstream_set_weight(struct upstream *up, unsigned int weight)
  805. {
  806. RSPAMD_UPSTREAM_LOCK(up);
  807. up->weight = weight;
  808. RSPAMD_UPSTREAM_UNLOCK(up);
  809. }
  810. #define SEED_CONSTANT 0xa574de7df64e9b9dULL
  811. struct upstream_list *
  812. rspamd_upstreams_create(struct upstream_ctx *ctx)
  813. {
  814. struct upstream_list *ls;
  815. ls = g_malloc0(sizeof(*ls));
  816. ls->hash_seed = SEED_CONSTANT;
  817. ls->ups = g_ptr_array_new();
  818. ls->alive = g_ptr_array_new();
  819. #ifdef UPSTREAMS_THREAD_SAFE
  820. ls->lock = rspamd_mutex_new();
  821. #endif
  822. ls->cur_elt = 0;
  823. ls->ctx = ctx;
  824. ls->rot_alg = RSPAMD_UPSTREAM_UNDEF;
  825. if (ctx) {
  826. ls->limits = &ctx->limits;
  827. }
  828. else {
  829. ls->limits = &default_limits;
  830. }
  831. return ls;
  832. }
  833. gsize rspamd_upstreams_count(struct upstream_list *ups)
  834. {
  835. return ups != NULL ? ups->ups->len : 0;
  836. }
  837. gsize rspamd_upstreams_alive(struct upstream_list *ups)
  838. {
  839. return ups != NULL ? ups->alive->len : 0;
  840. }
  841. static void
  842. rspamd_upstream_dtor(struct upstream *up)
  843. {
  844. struct upstream_inet_addr_entry *cur, *tmp;
  845. if (up->new_addrs) {
  846. LL_FOREACH_SAFE(up->new_addrs, cur, tmp)
  847. {
  848. /* Here we need to free pointer as well */
  849. rspamd_inet_address_free(cur->addr);
  850. g_free(cur);
  851. }
  852. }
  853. if (up->addrs.addr) {
  854. g_ptr_array_free(up->addrs.addr, TRUE);
  855. }
  856. #ifdef UPSTREAMS_THREAD_SAFE
  857. rspamd_mutex_free(up->lock);
  858. #endif
  859. if (up->ctx) {
  860. if (ev_can_stop(&up->ev)) {
  861. ev_timer_stop(up->ctx->event_loop, &up->ev);
  862. }
  863. g_queue_delete_link(up->ctx->upstreams, up->ctx_pos);
  864. REF_RELEASE(up->ctx);
  865. }
  866. g_free(up);
  867. }
  868. rspamd_inet_addr_t *
  869. rspamd_upstream_addr_next(struct upstream *up)
  870. {
  871. unsigned int idx, next_idx;
  872. struct upstream_addr_elt *e1, *e2;
  873. do {
  874. idx = up->addrs.cur;
  875. next_idx = (idx + 1) % up->addrs.addr->len;
  876. e1 = g_ptr_array_index(up->addrs.addr, idx);
  877. e2 = g_ptr_array_index(up->addrs.addr, next_idx);
  878. up->addrs.cur = next_idx;
  879. } while (e2->errors > e1->errors);
  880. return e2->addr;
  881. }
  882. rspamd_inet_addr_t *
  883. rspamd_upstream_addr_cur(const struct upstream *up)
  884. {
  885. struct upstream_addr_elt *elt;
  886. elt = g_ptr_array_index(up->addrs.addr, up->addrs.cur);
  887. return elt->addr;
  888. }
  889. const char *
  890. rspamd_upstream_name(struct upstream *up)
  891. {
  892. return up->name;
  893. }
  894. int rspamd_upstream_port(struct upstream *up)
  895. {
  896. struct upstream_addr_elt *elt;
  897. elt = g_ptr_array_index(up->addrs.addr, up->addrs.cur);
  898. return rspamd_inet_address_get_port(elt->addr);
  899. }
  900. gboolean
  901. rspamd_upstreams_add_upstream(struct upstream_list *ups, const char *str,
  902. uint16_t def_port, enum rspamd_upstream_parse_type parse_type,
  903. void *data)
  904. {
  905. struct upstream *upstream;
  906. GPtrArray *addrs = NULL;
  907. unsigned int i, slen;
  908. rspamd_inet_addr_t *addr;
  909. enum rspamd_parse_host_port_result ret = RSPAMD_PARSE_ADDR_FAIL;
  910. upstream = g_malloc0(sizeof(*upstream));
  911. slen = strlen(str);
  912. switch (parse_type) {
  913. case RSPAMD_UPSTREAM_PARSE_DEFAULT:
  914. if (slen > sizeof("service=") &&
  915. RSPAMD_LEN_CHECK_STARTS_WITH(str, slen, "service=")) {
  916. const char *plus_pos, *service_pos, *semicolon_pos;
  917. /* Accept service=srv_name+hostname[:priority] */
  918. service_pos = str + sizeof("service=") - 1;
  919. plus_pos = strchr(service_pos, '+');
  920. if (plus_pos != NULL) {
  921. semicolon_pos = strchr(plus_pos + 1, ':');
  922. if (semicolon_pos) {
  923. upstream->weight = strtoul(semicolon_pos + 1, NULL, 10);
  924. }
  925. else {
  926. semicolon_pos = plus_pos + strlen(plus_pos);
  927. }
  928. /*
  929. * Now our name is _service._tcp.<domain>
  930. * where <domain> is string between semicolon_pos and plus_pos +1
  931. * while service is a string between service_pos and plus_pos
  932. */
  933. unsigned int namelen = (semicolon_pos - (plus_pos + 1)) +
  934. (plus_pos - service_pos) +
  935. (sizeof("tcp") - 1) +
  936. 4;
  937. addrs = g_ptr_array_sized_new(1);
  938. upstream->name = ups->ctx ? rspamd_mempool_alloc(ups->ctx->pool, namelen + 1) : g_malloc(namelen + 1);
  939. rspamd_snprintf(upstream->name, namelen + 1,
  940. "_%*s._tcp.%*s",
  941. (int) (plus_pos - service_pos), service_pos,
  942. (int) (semicolon_pos - (plus_pos + 1)), plus_pos + 1);
  943. upstream->flags |= RSPAMD_UPSTREAM_FLAG_SRV_RESOLVE;
  944. ret = RSPAMD_PARSE_ADDR_RESOLVED;
  945. }
  946. }
  947. else {
  948. ret = rspamd_parse_host_port_priority(str, &addrs,
  949. &upstream->weight,
  950. &upstream->name, def_port,
  951. FALSE,
  952. ups->ctx ? ups->ctx->pool : NULL);
  953. }
  954. break;
  955. case RSPAMD_UPSTREAM_PARSE_NAMESERVER:
  956. addrs = g_ptr_array_sized_new(1);
  957. if (rspamd_parse_inet_address(&addr, str, strlen(str),
  958. RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
  959. if (ups->ctx) {
  960. upstream->name = rspamd_mempool_strdup(ups->ctx->pool, str);
  961. }
  962. else {
  963. upstream->name = g_strdup(str);
  964. }
  965. if (rspamd_inet_address_get_port(addr) == 0) {
  966. rspamd_inet_address_set_port(addr, def_port);
  967. }
  968. g_ptr_array_add(addrs, addr);
  969. ret = RSPAMD_PARSE_ADDR_NUMERIC;
  970. if (ups->ctx) {
  971. rspamd_mempool_add_destructor(ups->ctx->pool,
  972. (rspamd_mempool_destruct_t) rspamd_inet_address_free,
  973. addr);
  974. rspamd_mempool_add_destructor(ups->ctx->pool,
  975. (rspamd_mempool_destruct_t) rspamd_ptr_array_free_hard,
  976. addrs);
  977. }
  978. }
  979. else {
  980. g_ptr_array_free(addrs, TRUE);
  981. }
  982. break;
  983. }
  984. if (ret == RSPAMD_PARSE_ADDR_FAIL) {
  985. g_free(upstream);
  986. return FALSE;
  987. }
  988. else {
  989. upstream->flags |= ups->flags;
  990. if (ret == RSPAMD_PARSE_ADDR_NUMERIC) {
  991. /* Add noresolve flag */
  992. upstream->flags |= RSPAMD_UPSTREAM_FLAG_NORESOLVE;
  993. }
  994. for (i = 0; i < addrs->len; i++) {
  995. addr = g_ptr_array_index(addrs, i);
  996. rspamd_upstream_add_addr(upstream, rspamd_inet_address_copy(addr, NULL));
  997. }
  998. }
  999. if (upstream->weight == 0 && ups->rot_alg == RSPAMD_UPSTREAM_MASTER_SLAVE) {
  1000. /* Special heuristic for master-slave rotation */
  1001. if (ups->ups->len == 0) {
  1002. /* Prioritize the first */
  1003. upstream->weight = 1;
  1004. }
  1005. }
  1006. g_ptr_array_add(ups->ups, upstream);
  1007. upstream->ud = data;
  1008. upstream->cur_weight = upstream->weight;
  1009. upstream->ls = ups;
  1010. REF_INIT_RETAIN(upstream, rspamd_upstream_dtor);
  1011. #ifdef UPSTREAMS_THREAD_SAFE
  1012. upstream->lock = rspamd_mutex_new();
  1013. #endif
  1014. upstream->ctx = ups->ctx;
  1015. if (upstream->ctx) {
  1016. REF_RETAIN(ups->ctx);
  1017. g_queue_push_tail(ups->ctx->upstreams, upstream);
  1018. upstream->ctx_pos = g_queue_peek_tail_link(ups->ctx->upstreams);
  1019. }
  1020. unsigned int h = rspamd_cryptobox_fast_hash(upstream->name,
  1021. strlen(upstream->name), 0);
  1022. memset(upstream->uid, 0, sizeof(upstream->uid));
  1023. rspamd_encode_base32_buf((const unsigned char *) &h, sizeof(h),
  1024. upstream->uid, sizeof(upstream->uid) - 1, RSPAMD_BASE32_DEFAULT);
  1025. msg_debug_upstream("added upstream %s (%s)", upstream->name,
  1026. upstream->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE ? "numeric ip" : "DNS name");
  1027. g_ptr_array_sort(upstream->addrs.addr, rspamd_upstream_addr_sort_func);
  1028. rspamd_upstream_set_active(ups, upstream);
  1029. return TRUE;
  1030. }
  1031. void rspamd_upstreams_set_flags(struct upstream_list *ups,
  1032. enum rspamd_upstream_flag flags)
  1033. {
  1034. ups->flags = flags;
  1035. }
  1036. void rspamd_upstreams_set_rotation(struct upstream_list *ups,
  1037. enum rspamd_upstream_rotation rot)
  1038. {
  1039. ups->rot_alg = rot;
  1040. }
  1041. gboolean
  1042. rspamd_upstream_add_addr(struct upstream *up, rspamd_inet_addr_t *addr)
  1043. {
  1044. struct upstream_addr_elt *elt;
  1045. /*
  1046. * XXX: slow and inefficient
  1047. */
  1048. if (up->addrs.addr == NULL) {
  1049. up->addrs.addr = g_ptr_array_new_full(8, rspamd_upstream_addr_elt_dtor);
  1050. }
  1051. elt = g_malloc0(sizeof(*elt));
  1052. elt->addr = addr;
  1053. g_ptr_array_add(up->addrs.addr, elt);
  1054. g_ptr_array_sort(up->addrs.addr, rspamd_upstream_addr_sort_func);
  1055. return TRUE;
  1056. }
  1057. gboolean
  1058. rspamd_upstreams_parse_line_len(struct upstream_list *ups,
  1059. const char *str, gsize len, uint16_t def_port, void *data)
  1060. {
  1061. const char *end = str + len, *p = str;
  1062. const char *separators = ";, \n\r\t";
  1063. char *tmp;
  1064. unsigned int span_len;
  1065. gboolean ret = FALSE;
  1066. if (RSPAMD_LEN_CHECK_STARTS_WITH(p, len, "random:")) {
  1067. ups->rot_alg = RSPAMD_UPSTREAM_RANDOM;
  1068. p += sizeof("random:") - 1;
  1069. }
  1070. else if (RSPAMD_LEN_CHECK_STARTS_WITH(p, len, "master-slave:")) {
  1071. ups->rot_alg = RSPAMD_UPSTREAM_MASTER_SLAVE;
  1072. p += sizeof("master-slave:") - 1;
  1073. }
  1074. else if (RSPAMD_LEN_CHECK_STARTS_WITH(p, len, "round-robin:")) {
  1075. ups->rot_alg = RSPAMD_UPSTREAM_ROUND_ROBIN;
  1076. p += sizeof("round-robin:") - 1;
  1077. }
  1078. else if (RSPAMD_LEN_CHECK_STARTS_WITH(p, len, "hash:")) {
  1079. ups->rot_alg = RSPAMD_UPSTREAM_HASHED;
  1080. p += sizeof("hash:") - 1;
  1081. }
  1082. while (p < end) {
  1083. span_len = rspamd_memcspn(p, separators, end - p);
  1084. if (span_len > 0) {
  1085. tmp = g_malloc(span_len + 1);
  1086. rspamd_strlcpy(tmp, p, span_len + 1);
  1087. if (rspamd_upstreams_add_upstream(ups, tmp, def_port,
  1088. RSPAMD_UPSTREAM_PARSE_DEFAULT,
  1089. data)) {
  1090. ret = TRUE;
  1091. }
  1092. g_free(tmp);
  1093. }
  1094. p += span_len;
  1095. /* Skip separators */
  1096. if (p < end) {
  1097. p += rspamd_memspn(p, separators, end - p);
  1098. }
  1099. }
  1100. if (!ups->ups_line) {
  1101. ups->ups_line = g_malloc(len + 1);
  1102. rspamd_strlcpy(ups->ups_line, str, len + 1);
  1103. }
  1104. return ret;
  1105. }
  1106. gboolean
  1107. rspamd_upstreams_parse_line(struct upstream_list *ups,
  1108. const char *str, uint16_t def_port, void *data)
  1109. {
  1110. return rspamd_upstreams_parse_line_len(ups, str, strlen(str),
  1111. def_port, data);
  1112. }
  1113. gboolean
  1114. rspamd_upstreams_from_ucl(struct upstream_list *ups,
  1115. const ucl_object_t *in, uint16_t def_port, void *data)
  1116. {
  1117. gboolean ret = FALSE;
  1118. const ucl_object_t *cur;
  1119. ucl_object_iter_t it = NULL;
  1120. it = ucl_object_iterate_new(in);
  1121. while ((cur = ucl_object_iterate_safe(it, true)) != NULL) {
  1122. if (ucl_object_type(cur) == UCL_STRING) {
  1123. ret = rspamd_upstreams_parse_line(ups, ucl_object_tostring(cur),
  1124. def_port, data);
  1125. }
  1126. }
  1127. ucl_object_iterate_free(it);
  1128. return ret;
  1129. }
  1130. void rspamd_upstreams_destroy(struct upstream_list *ups)
  1131. {
  1132. unsigned int i;
  1133. struct upstream *up;
  1134. struct upstream_list_watcher *w, *tmp;
  1135. if (ups != NULL) {
  1136. g_ptr_array_free(ups->alive, TRUE);
  1137. for (i = 0; i < ups->ups->len; i++) {
  1138. up = g_ptr_array_index(ups->ups, i);
  1139. up->ls = NULL;
  1140. REF_RELEASE(up);
  1141. }
  1142. DL_FOREACH_SAFE(ups->watchers, w, tmp)
  1143. {
  1144. if (w->dtor) {
  1145. w->dtor(w->ud);
  1146. }
  1147. g_free(w);
  1148. }
  1149. g_free(ups->ups_line);
  1150. g_ptr_array_free(ups->ups, TRUE);
  1151. #ifdef UPSTREAMS_THREAD_SAFE
  1152. rspamd_mutex_free(ups->lock);
  1153. #endif
  1154. g_free(ups);
  1155. }
  1156. }
  1157. static void
  1158. rspamd_upstream_restore_cb(gpointer elt, gpointer ls)
  1159. {
  1160. struct upstream *up = (struct upstream *) elt;
  1161. struct upstream_list *ups = (struct upstream_list *) ls;
  1162. struct upstream_list_watcher *w;
  1163. /* Here the upstreams list is already locked */
  1164. RSPAMD_UPSTREAM_LOCK(up);
  1165. if (ev_can_stop(&up->ev)) {
  1166. ev_timer_stop(up->ctx->event_loop, &up->ev);
  1167. }
  1168. g_ptr_array_add(ups->alive, up);
  1169. up->active_idx = ups->alive->len - 1;
  1170. RSPAMD_UPSTREAM_UNLOCK(up);
  1171. DL_FOREACH(up->ls->watchers, w)
  1172. {
  1173. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) {
  1174. w->func(up, RSPAMD_UPSTREAM_WATCH_ONLINE, up->errors, w->ud);
  1175. }
  1176. }
  1177. /* For revive event */
  1178. g_assert(up->ref.refcount > 1);
  1179. REF_RELEASE(up);
  1180. }
  1181. static struct upstream *
  1182. rspamd_upstream_get_random(struct upstream_list *ups,
  1183. struct upstream *except)
  1184. {
  1185. for (;;) {
  1186. unsigned int idx = ottery_rand_range(ups->alive->len - 1);
  1187. struct upstream *up;
  1188. up = g_ptr_array_index(ups->alive, idx);
  1189. if (except && up == except) {
  1190. continue;
  1191. }
  1192. return up;
  1193. }
  1194. }
  1195. static struct upstream *
  1196. rspamd_upstream_get_round_robin(struct upstream_list *ups,
  1197. struct upstream *except,
  1198. gboolean use_cur)
  1199. {
  1200. unsigned int max_weight = 0, min_checked = G_MAXUINT;
  1201. struct upstream *up = NULL, *selected = NULL, *min_checked_sel = NULL;
  1202. unsigned int i;
  1203. /* Select upstream with the maximum cur_weight */
  1204. RSPAMD_UPSTREAM_LOCK(ups);
  1205. for (i = 0; i < ups->alive->len; i++) {
  1206. up = g_ptr_array_index(ups->alive, i);
  1207. if (except != NULL && up == except) {
  1208. continue;
  1209. }
  1210. if (use_cur) {
  1211. if (up->cur_weight > max_weight) {
  1212. selected = up;
  1213. max_weight = up->cur_weight;
  1214. }
  1215. }
  1216. else {
  1217. if (up->weight > max_weight) {
  1218. selected = up;
  1219. max_weight = up->weight;
  1220. }
  1221. }
  1222. /*
  1223. * This code is used when all upstreams have zero weight
  1224. * The logic is to select least currently used upstream and penalise
  1225. * upstream with errors. The error penalty should no be too high
  1226. * to avoid sudden traffic drop in this case.
  1227. */
  1228. if (up->checked + up->errors * 2 < min_checked) {
  1229. min_checked_sel = up;
  1230. min_checked = up->checked;
  1231. }
  1232. }
  1233. if (max_weight == 0) {
  1234. /* All upstreams have zero weight */
  1235. if (min_checked > G_MAXUINT / 2) {
  1236. /* Reset all checked counters to avoid overflow */
  1237. for (i = 0; i < ups->alive->len; i++) {
  1238. up = g_ptr_array_index(ups->alive, i);
  1239. up->checked = 0;
  1240. }
  1241. }
  1242. selected = min_checked_sel;
  1243. }
  1244. if (use_cur && selected) {
  1245. if (selected->cur_weight > 0) {
  1246. selected->cur_weight--;
  1247. }
  1248. else {
  1249. selected->cur_weight = selected->weight;
  1250. }
  1251. }
  1252. RSPAMD_UPSTREAM_UNLOCK(ups);
  1253. return selected;
  1254. }
  1255. /*
  1256. * The key idea of this function is obtained from the following paper:
  1257. * A Fast, Minimal Memory, Consistent Hash Algorithm
  1258. * John Lamping, Eric Veach
  1259. *
  1260. * http://arxiv.org/abs/1406.2294
  1261. */
  1262. static uint32_t
  1263. rspamd_consistent_hash(uint64_t key, uint32_t nbuckets)
  1264. {
  1265. int64_t b = -1, j = 0;
  1266. while (j < nbuckets) {
  1267. b = j;
  1268. key *= 2862933555777941757ULL + 1;
  1269. j = (b + 1) * (double) (1ULL << 31) / (double) ((key >> 33) + 1ULL);
  1270. }
  1271. return b;
  1272. }
  1273. static struct upstream *
  1274. rspamd_upstream_get_hashed(struct upstream_list *ups,
  1275. struct upstream *except,
  1276. const uint8_t *key, unsigned int keylen)
  1277. {
  1278. uint64_t k;
  1279. uint32_t idx;
  1280. static const unsigned int max_tries = 20;
  1281. struct upstream *up = NULL;
  1282. /* Generate 64 bits input key */
  1283. k = rspamd_cryptobox_fast_hash_specific(RSPAMD_CRYPTOBOX_XXHASH64,
  1284. key, keylen, ups->hash_seed);
  1285. RSPAMD_UPSTREAM_LOCK(ups);
  1286. /*
  1287. * Select new upstream from all upstreams
  1288. */
  1289. for (unsigned int i = 0; i < max_tries; i++) {
  1290. idx = rspamd_consistent_hash(k, ups->ups->len);
  1291. up = g_ptr_array_index(ups->ups, idx);
  1292. if (up->active_idx < 0 || (except != NULL && up == except)) {
  1293. /* Found inactive or excluded upstream */
  1294. k = mum_hash_step(k, ups->hash_seed);
  1295. }
  1296. else {
  1297. break;
  1298. }
  1299. }
  1300. RSPAMD_UPSTREAM_UNLOCK(ups);
  1301. if (up->active_idx >= 0) {
  1302. return up;
  1303. }
  1304. /* We failed to find any active upstream */
  1305. up = rspamd_upstream_get_random(ups, except);
  1306. msg_info("failed to find hashed upstream for %s, fallback to random: %s",
  1307. ups->ups_line, up->name);
  1308. return up;
  1309. }
  1310. static struct upstream *
  1311. rspamd_upstream_get_common(struct upstream_list *ups,
  1312. struct upstream *except,
  1313. enum rspamd_upstream_rotation default_type,
  1314. const unsigned char *key, gsize keylen,
  1315. gboolean forced)
  1316. {
  1317. enum rspamd_upstream_rotation type;
  1318. struct upstream *up = NULL;
  1319. RSPAMD_UPSTREAM_LOCK(ups);
  1320. if (ups->alive->len == 0) {
  1321. /* We have no upstreams alive */
  1322. msg_warn("there are no alive upstreams left for %s, revive all of them",
  1323. ups->ups_line);
  1324. g_ptr_array_foreach(ups->ups, rspamd_upstream_restore_cb, ups);
  1325. }
  1326. RSPAMD_UPSTREAM_UNLOCK(ups);
  1327. if (ups->alive->len == 1 && default_type != RSPAMD_UPSTREAM_SEQUENTIAL) {
  1328. /* Fast path */
  1329. up = g_ptr_array_index(ups->alive, 0);
  1330. goto end;
  1331. }
  1332. if (!forced) {
  1333. type = ups->rot_alg != RSPAMD_UPSTREAM_UNDEF ? ups->rot_alg : default_type;
  1334. }
  1335. else {
  1336. type = default_type != RSPAMD_UPSTREAM_UNDEF ? default_type : ups->rot_alg;
  1337. }
  1338. if (type == RSPAMD_UPSTREAM_HASHED && (keylen == 0 || key == NULL)) {
  1339. /* Cannot use hashed rotation when no key is specified, switch to random */
  1340. type = RSPAMD_UPSTREAM_RANDOM;
  1341. }
  1342. switch (type) {
  1343. default:
  1344. case RSPAMD_UPSTREAM_RANDOM:
  1345. up = rspamd_upstream_get_random(ups, except);
  1346. break;
  1347. case RSPAMD_UPSTREAM_HASHED:
  1348. up = rspamd_upstream_get_hashed(ups, except, key, keylen);
  1349. break;
  1350. case RSPAMD_UPSTREAM_ROUND_ROBIN:
  1351. up = rspamd_upstream_get_round_robin(ups, except, TRUE);
  1352. break;
  1353. case RSPAMD_UPSTREAM_MASTER_SLAVE:
  1354. up = rspamd_upstream_get_round_robin(ups, except, FALSE);
  1355. break;
  1356. case RSPAMD_UPSTREAM_SEQUENTIAL:
  1357. if (ups->cur_elt >= ups->alive->len) {
  1358. ups->cur_elt = 0;
  1359. return NULL;
  1360. }
  1361. up = g_ptr_array_index(ups->alive, ups->cur_elt++);
  1362. break;
  1363. }
  1364. end:
  1365. if (up) {
  1366. up->checked++;
  1367. }
  1368. return up;
  1369. }
  1370. struct upstream *
  1371. rspamd_upstream_get(struct upstream_list *ups,
  1372. enum rspamd_upstream_rotation default_type,
  1373. const unsigned char *key, gsize keylen)
  1374. {
  1375. return rspamd_upstream_get_common(ups, NULL, default_type, key, keylen, FALSE);
  1376. }
  1377. struct upstream *
  1378. rspamd_upstream_get_forced(struct upstream_list *ups,
  1379. enum rspamd_upstream_rotation forced_type,
  1380. const unsigned char *key, gsize keylen)
  1381. {
  1382. return rspamd_upstream_get_common(ups, NULL, forced_type, key, keylen, TRUE);
  1383. }
  1384. struct upstream *rspamd_upstream_get_except(struct upstream_list *ups,
  1385. struct upstream *except,
  1386. enum rspamd_upstream_rotation default_type,
  1387. const unsigned char *key, gsize keylen)
  1388. {
  1389. return rspamd_upstream_get_common(ups, except, default_type, key, keylen, FALSE);
  1390. }
  1391. void rspamd_upstream_reresolve(struct upstream_ctx *ctx)
  1392. {
  1393. GList *cur;
  1394. struct upstream *up;
  1395. cur = ctx->upstreams->head;
  1396. while (cur) {
  1397. up = cur->data;
  1398. REF_RETAIN(up);
  1399. rspamd_upstream_resolve_addrs(up->ls, up);
  1400. REF_RELEASE(up);
  1401. cur = g_list_next(cur);
  1402. }
  1403. }
  1404. gpointer
  1405. rspamd_upstream_set_data(struct upstream *up, gpointer data)
  1406. {
  1407. gpointer prev_data = up->data;
  1408. up->data = data;
  1409. return prev_data;
  1410. }
  1411. gpointer
  1412. rspamd_upstream_get_data(struct upstream *up)
  1413. {
  1414. return up->data;
  1415. }
  1416. void rspamd_upstreams_foreach(struct upstream_list *ups,
  1417. rspamd_upstream_traverse_func cb, void *ud)
  1418. {
  1419. struct upstream *up;
  1420. unsigned int i;
  1421. for (i = 0; i < ups->ups->len; i++) {
  1422. up = g_ptr_array_index(ups->ups, i);
  1423. cb(up, i, ud);
  1424. }
  1425. }
  1426. void rspamd_upstreams_set_limits(struct upstream_list *ups,
  1427. double revive_time,
  1428. double revive_jitter,
  1429. double error_time,
  1430. double dns_timeout,
  1431. unsigned int max_errors,
  1432. unsigned int dns_retransmits)
  1433. {
  1434. struct upstream_limits *nlimits;
  1435. g_assert(ups != NULL);
  1436. nlimits = rspamd_mempool_alloc(ups->ctx->pool, sizeof(*nlimits));
  1437. memcpy(nlimits, ups->limits, sizeof(*nlimits));
  1438. if (!isnan(revive_time)) {
  1439. nlimits->revive_time = revive_time;
  1440. }
  1441. if (!isnan(revive_jitter)) {
  1442. nlimits->revive_jitter = revive_jitter;
  1443. }
  1444. if (!isnan(error_time)) {
  1445. nlimits->error_time = error_time;
  1446. }
  1447. if (!isnan(dns_timeout)) {
  1448. nlimits->dns_timeout = dns_timeout;
  1449. }
  1450. if (max_errors > 0) {
  1451. nlimits->max_errors = max_errors;
  1452. }
  1453. if (dns_retransmits > 0) {
  1454. nlimits->dns_retransmits = dns_retransmits;
  1455. }
  1456. ups->limits = nlimits;
  1457. }
  1458. void rspamd_upstreams_add_watch_callback(struct upstream_list *ups,
  1459. enum rspamd_upstreams_watch_event events,
  1460. rspamd_upstream_watch_func func,
  1461. GFreeFunc dtor,
  1462. gpointer ud)
  1463. {
  1464. struct upstream_list_watcher *nw;
  1465. g_assert((events & RSPAMD_UPSTREAM_WATCH_ALL) != 0);
  1466. nw = g_malloc(sizeof(*nw));
  1467. nw->func = func;
  1468. nw->events_mask = events;
  1469. nw->ud = ud;
  1470. nw->dtor = dtor;
  1471. DL_APPEND(ups->watchers, nw);
  1472. }
  1473. struct upstream *
  1474. rspamd_upstream_ref(struct upstream *up)
  1475. {
  1476. REF_RETAIN(up);
  1477. return up;
  1478. }
  1479. void rspamd_upstream_unref(struct upstream *up)
  1480. {
  1481. REF_RELEASE(up);
  1482. }