You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

upstream.c 27KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "upstream.h"
  18. #include "ottery.h"
  19. #include "ref.h"
  20. #include "cfg_file.h"
  21. #include "rdns.h"
  22. #include "cryptobox.h"
  23. #include "utlist.h"
  24. #include <math.h>
  25. struct upstream_inet_addr_entry {
  26. rspamd_inet_addr_t *addr;
  27. struct upstream_inet_addr_entry *next;
  28. };
  29. struct upstream_addr_elt {
  30. rspamd_inet_addr_t *addr;
  31. guint errors;
  32. };
  33. struct upstream_list_watcher {
  34. rspamd_upstream_watch_func func;
  35. GFreeFunc dtor;
  36. gpointer ud;
  37. enum rspamd_upstreams_watch_event events_mask;
  38. struct upstream_list_watcher *next, *prev;
  39. };
  40. struct upstream {
  41. guint weight;
  42. guint cur_weight;
  43. guint errors;
  44. guint checked;
  45. guint dns_requests;
  46. gint active_idx;
  47. gchar *name;
  48. struct event ev;
  49. gdouble last_fail;
  50. gpointer ud;
  51. struct upstream_list *ls;
  52. GList *ctx_pos;
  53. struct upstream_ctx *ctx;
  54. struct {
  55. GPtrArray *addr; /* struct upstream_addr_elt */
  56. guint cur;
  57. } addrs;
  58. struct upstream_inet_addr_entry *new_addrs;
  59. rspamd_mutex_t *lock;
  60. gpointer data;
  61. ref_entry_t ref;
  62. };
  63. struct upstream_limits {
  64. gdouble revive_time;
  65. gdouble revive_jitter;
  66. gdouble error_time;
  67. gdouble dns_timeout;
  68. guint max_errors;
  69. guint dns_retransmits;
  70. };
  71. struct upstream_list {
  72. struct upstream_ctx *ctx;
  73. GPtrArray *ups;
  74. GPtrArray *alive;
  75. struct upstream_list_watcher *watchers;
  76. rspamd_mutex_t *lock;
  77. guint64 hash_seed;
  78. struct upstream_limits limits;
  79. guint cur_elt;
  80. enum rspamd_upstream_flag flags;
  81. enum rspamd_upstream_rotation rot_alg;
  82. };
  83. struct upstream_ctx {
  84. struct rdns_resolver *res;
  85. struct event_base *ev_base;
  86. struct upstream_limits limits;
  87. GQueue *upstreams;
  88. gboolean configured;
  89. rspamd_mempool_t *pool;
  90. ref_entry_t ref;
  91. };
  92. #ifndef UPSTREAMS_THREAD_SAFE
  93. #define RSPAMD_UPSTREAM_LOCK(x) do { } while (0)
  94. #define RSPAMD_UPSTREAM_UNLOCK(x) do { } while (0)
  95. #else
  96. #define RSPAMD_UPSTREAM_LOCK(x) rspamd_mutex_lock(x)
  97. #define RSPAMD_UPSTREAM_UNLOCK(x) rspamd_mutex_unlock(x)
  98. #endif
  99. /* 4 errors in 10 seconds */
  100. static guint default_max_errors = 4;
  101. static gdouble default_revive_time = 60;
  102. static gdouble default_revive_jitter = 0.4;
  103. static gdouble default_error_time = 10;
  104. static gdouble default_dns_timeout = 1.0;
  105. static guint default_dns_retransmits = 2;
  106. void
  107. rspamd_upstreams_library_config (struct rspamd_config *cfg,
  108. struct upstream_ctx *ctx,
  109. struct event_base *ev_base,
  110. struct rdns_resolver *resolver)
  111. {
  112. g_assert (ctx != NULL);
  113. g_assert (cfg != NULL);
  114. if (cfg->upstream_error_time) {
  115. ctx->limits.error_time = cfg->upstream_error_time;
  116. }
  117. if (cfg->upstream_max_errors) {
  118. ctx->limits.max_errors = cfg->upstream_max_errors;
  119. }
  120. if (cfg->upstream_revive_time) {
  121. ctx->limits.revive_time = cfg->upstream_max_errors;
  122. }
  123. if (cfg->dns_retransmits) {
  124. ctx->limits.dns_retransmits = cfg->dns_retransmits;
  125. }
  126. if (cfg->dns_timeout) {
  127. ctx->limits.dns_timeout = cfg->dns_timeout;
  128. }
  129. ctx->ev_base = ev_base;
  130. ctx->res = resolver;
  131. ctx->configured = TRUE;
  132. }
  133. static void
  134. rspamd_upstream_ctx_dtor (struct upstream_ctx *ctx)
  135. {
  136. GList *cur;
  137. struct upstream *u;
  138. cur = ctx->upstreams->head;
  139. while (cur) {
  140. u = cur->data;
  141. u->ctx = NULL;
  142. u->ctx_pos = NULL;
  143. cur = g_list_next (cur);
  144. }
  145. g_queue_free (ctx->upstreams);
  146. rspamd_mempool_delete (ctx->pool);
  147. g_free (ctx);
  148. }
  149. void
  150. rspamd_upstreams_library_unref (struct upstream_ctx *ctx)
  151. {
  152. REF_RELEASE (ctx);
  153. }
  154. struct upstream_ctx *
  155. rspamd_upstreams_library_init (void)
  156. {
  157. struct upstream_ctx *ctx;
  158. ctx = g_malloc0 (sizeof (*ctx));
  159. ctx->limits.error_time = default_error_time;
  160. ctx->limits.max_errors = default_max_errors;
  161. ctx->limits.dns_retransmits = default_dns_retransmits;
  162. ctx->limits.dns_timeout = default_dns_timeout;
  163. ctx->limits.revive_jitter = default_revive_jitter;
  164. ctx->limits.revive_time = default_revive_time;
  165. ctx->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (),
  166. "upstreams");
  167. ctx->upstreams = g_queue_new ();
  168. REF_INIT_RETAIN (ctx, rspamd_upstream_ctx_dtor);
  169. return ctx;
  170. }
  171. static gint
  172. rspamd_upstream_af_to_weight (const rspamd_inet_addr_t *addr)
  173. {
  174. int ret;
  175. switch (rspamd_inet_address_get_af (addr)) {
  176. case AF_UNIX:
  177. ret = 2;
  178. break;
  179. case AF_INET:
  180. ret = 1;
  181. break;
  182. default:
  183. ret = 0;
  184. break;
  185. }
  186. return ret;
  187. }
  188. /*
  189. * Select IPv4 addresses before IPv6
  190. */
  191. static gint
  192. rspamd_upstream_addr_sort_func (gconstpointer a, gconstpointer b)
  193. {
  194. const struct upstream_addr_elt **ip1 = (const struct upstream_addr_elt **)a,
  195. **ip2 = (const struct upstream_addr_elt **)b;
  196. gint w1, w2;
  197. w1 = rspamd_upstream_af_to_weight ((*ip1)->addr);
  198. w2 = rspamd_upstream_af_to_weight ((*ip2)->addr);
  199. return w2 - w1;
  200. }
  201. static void
  202. rspamd_upstream_set_active (struct upstream_list *ls, struct upstream *up)
  203. {
  204. RSPAMD_UPSTREAM_LOCK (ls->lock);
  205. g_ptr_array_add (ls->alive, up);
  206. up->active_idx = ls->alive->len - 1;
  207. RSPAMD_UPSTREAM_UNLOCK (ls->lock);
  208. }
  209. static void
  210. rspamd_upstream_addr_elt_dtor (gpointer a)
  211. {
  212. struct upstream_addr_elt *elt = a;
  213. if (elt) {
  214. rspamd_inet_address_free (elt->addr);
  215. g_free (elt);
  216. }
  217. }
  218. static void
  219. rspamd_upstream_update_addrs (struct upstream *up)
  220. {
  221. guint addr_cnt, i, port;
  222. gboolean seen_addr, reset_errors = FALSE;
  223. struct upstream_inet_addr_entry *cur, *tmp;
  224. GPtrArray *new_addrs;
  225. struct upstream_addr_elt *addr_elt, *naddr;
  226. /*
  227. * We need first of all get the saved port, since DNS gives us no
  228. * idea about what port has been used previously
  229. */
  230. RSPAMD_UPSTREAM_LOCK (up->lock);
  231. if (up->addrs.addr->len > 0 && up->new_addrs) {
  232. addr_elt = g_ptr_array_index (up->addrs.addr, 0);
  233. port = rspamd_inet_address_get_port (addr_elt->addr);
  234. /* Now calculate new addrs count */
  235. addr_cnt = 0;
  236. LL_FOREACH (up->new_addrs, cur) {
  237. addr_cnt++;
  238. }
  239. /* At 10% probability reset errors on addr elements */
  240. if (rspamd_random_double_fast () > 0.9) {
  241. reset_errors = TRUE;
  242. }
  243. new_addrs = g_ptr_array_new_full (addr_cnt, rspamd_upstream_addr_elt_dtor);
  244. /* Copy addrs back */
  245. LL_FOREACH (up->new_addrs, cur) {
  246. seen_addr = FALSE;
  247. naddr = NULL;
  248. /* Ports are problematic, set to compare in the next block */
  249. rspamd_inet_address_set_port (cur->addr, port);
  250. PTR_ARRAY_FOREACH (up->addrs.addr, i, addr_elt) {
  251. if (rspamd_inet_address_compare (addr_elt->addr, cur->addr) == 0) {
  252. naddr = g_malloc0 (sizeof (*naddr));
  253. naddr->addr = cur->addr;
  254. naddr->errors = reset_errors ? 0 : addr_elt->errors;
  255. seen_addr = TRUE;
  256. break;
  257. }
  258. }
  259. if (!seen_addr) {
  260. naddr = g_malloc0 (sizeof (*naddr));
  261. naddr->addr = cur->addr;
  262. naddr->errors = 0;
  263. }
  264. g_ptr_array_add (new_addrs, naddr);
  265. }
  266. /* Free old addresses */
  267. g_ptr_array_free (up->addrs.addr, TRUE);
  268. up->addrs.cur = 0;
  269. up->addrs.addr = new_addrs;
  270. g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
  271. }
  272. LL_FOREACH_SAFE (up->new_addrs, cur, tmp) {
  273. /* Do not free inet address pointer since it has been transferred to up */
  274. g_free (cur);
  275. }
  276. up->new_addrs = NULL;
  277. RSPAMD_UPSTREAM_UNLOCK (up->lock);
  278. }
  279. static void
  280. rspamd_upstream_dns_cb (struct rdns_reply *reply, void *arg)
  281. {
  282. struct upstream *up = (struct upstream *)arg;
  283. struct rdns_reply_entry *entry;
  284. struct upstream_inet_addr_entry *up_ent;
  285. if (reply->code == RDNS_RC_NOERROR) {
  286. entry = reply->entries;
  287. RSPAMD_UPSTREAM_LOCK (up->lock);
  288. while (entry) {
  289. if (entry->type == RDNS_REQUEST_A) {
  290. up_ent = g_malloc0 (sizeof (*up_ent));
  291. up_ent->addr = rspamd_inet_address_new (AF_INET,
  292. &entry->content.a.addr);
  293. LL_PREPEND (up->new_addrs, up_ent);
  294. }
  295. else if (entry->type == RDNS_REQUEST_AAAA) {
  296. up_ent = g_malloc0 (sizeof (*up_ent));
  297. up_ent->addr = rspamd_inet_address_new (AF_INET6,
  298. &entry->content.aaa.addr);
  299. LL_PREPEND (up->new_addrs, up_ent);
  300. }
  301. entry = entry->next;
  302. }
  303. RSPAMD_UPSTREAM_UNLOCK (up->lock);
  304. }
  305. up->dns_requests--;
  306. if (up->dns_requests == 0) {
  307. rspamd_upstream_update_addrs (up);
  308. }
  309. REF_RELEASE (up);
  310. }
  311. static void
  312. rspamd_upstream_revive_cb (int fd, short what, void *arg)
  313. {
  314. struct upstream *up = (struct upstream *)arg;
  315. RSPAMD_UPSTREAM_LOCK (up->lock);
  316. event_del (&up->ev);
  317. if (up->ls) {
  318. rspamd_upstream_set_active (up->ls, up);
  319. }
  320. RSPAMD_UPSTREAM_UNLOCK (up->lock);
  321. REF_RELEASE (up);
  322. }
  323. static void
  324. rspamd_upstream_resolve_addrs (const struct upstream_list *ls,
  325. struct upstream *up)
  326. {
  327. if (up->ctx->res != NULL &&
  328. up->ctx->configured &&
  329. up->dns_requests == 0 &&
  330. !(ls->flags & RSPAMD_UPSTREAM_FLAG_NORESOLVE)) {
  331. /* Resolve name of the upstream one more time */
  332. if (up->name[0] != '/') {
  333. if (rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
  334. ls->limits.dns_timeout, ls->limits.dns_retransmits,
  335. 1, up->name, RDNS_REQUEST_A) != NULL) {
  336. up->dns_requests ++;
  337. REF_RETAIN (up);
  338. }
  339. if (rdns_make_request_full (up->ctx->res, rspamd_upstream_dns_cb, up,
  340. ls->limits.dns_timeout, ls->limits.dns_retransmits,
  341. 1, up->name, RDNS_REQUEST_AAAA) != NULL) {
  342. up->dns_requests ++;
  343. REF_RETAIN (up);
  344. }
  345. }
  346. }
  347. }
  348. static void
  349. rspamd_upstream_set_inactive (struct upstream_list *ls, struct upstream *up)
  350. {
  351. gdouble ntim;
  352. guint i;
  353. struct upstream *cur;
  354. struct timeval tv;
  355. struct upstream_list_watcher *w;
  356. RSPAMD_UPSTREAM_LOCK (ls->lock);
  357. g_ptr_array_remove_index (ls->alive, up->active_idx);
  358. up->active_idx = -1;
  359. /* We need to update all indicies */
  360. for (i = 0; i < ls->alive->len; i ++) {
  361. cur = g_ptr_array_index (ls->alive, i);
  362. cur->active_idx = i;
  363. }
  364. if (up->ctx) {
  365. rspamd_upstream_resolve_addrs (ls, up);
  366. REF_RETAIN (up);
  367. evtimer_set (&up->ev, rspamd_upstream_revive_cb, up);
  368. if (up->ctx->ev_base != NULL && up->ctx->configured) {
  369. event_base_set (up->ctx->ev_base, &up->ev);
  370. }
  371. ntim = rspamd_time_jitter (ls->limits.revive_time,
  372. ls->limits.revive_jitter);
  373. double_to_tv (ntim, &tv);
  374. event_add (&up->ev, &tv);
  375. }
  376. DL_FOREACH (up->ls->watchers, w) {
  377. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_OFFLINE) {
  378. w->func (up, RSPAMD_UPSTREAM_WATCH_OFFLINE, up->errors, w->ud);
  379. }
  380. }
  381. RSPAMD_UPSTREAM_UNLOCK (ls->lock);
  382. }
  383. void
  384. rspamd_upstream_fail (struct upstream *up, gboolean addr_failure)
  385. {
  386. gdouble error_rate, max_error_rate;
  387. gdouble sec_last, sec_cur;
  388. struct upstream_addr_elt *addr_elt;
  389. struct upstream_list_watcher *w;
  390. if (up->ctx && up->active_idx != -1) {
  391. sec_cur = rspamd_get_ticks (FALSE);
  392. RSPAMD_UPSTREAM_LOCK (up->lock);
  393. if (up->errors == 0) {
  394. /* We have the first error */
  395. up->last_fail = sec_cur;
  396. up->errors = 1;
  397. DL_FOREACH (up->ls->watchers, w) {
  398. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
  399. w->func (up, RSPAMD_UPSTREAM_WATCH_FAILURE, 1, w->ud);
  400. }
  401. }
  402. }
  403. else {
  404. sec_last = up->last_fail;
  405. if (sec_cur >= sec_last) {
  406. up->errors ++;
  407. DL_FOREACH (up->ls->watchers, w) {
  408. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_FAILURE) {
  409. w->func (up, RSPAMD_UPSTREAM_WATCH_FAILURE, up->errors, w->ud);
  410. }
  411. }
  412. if (sec_cur > sec_last) {
  413. error_rate = ((gdouble)up->errors) / (sec_cur - sec_last);
  414. max_error_rate = ((gdouble)up->ls->limits.max_errors) /
  415. up->ls->limits.error_time;
  416. }
  417. else {
  418. error_rate = 1;
  419. max_error_rate = 0;
  420. }
  421. if (error_rate > max_error_rate) {
  422. /* Remove upstream from the active list */
  423. if (up->ls->ups->len > 1) {
  424. up->errors = 0;
  425. rspamd_upstream_set_inactive (up->ls, up);
  426. }
  427. else {
  428. /* Just re-resolve addresses */
  429. if (sec_cur - sec_last > up->ls->limits.revive_time) {
  430. up->errors = 0;
  431. rspamd_upstream_resolve_addrs (up->ls, up);
  432. }
  433. }
  434. }
  435. }
  436. }
  437. if (addr_failure) {
  438. /* Also increase count of errors for this specific address */
  439. if (up->addrs.addr) {
  440. addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
  441. addr_elt->errors++;
  442. }
  443. }
  444. RSPAMD_UPSTREAM_UNLOCK (up->lock);
  445. }
  446. }
  447. void
  448. rspamd_upstream_ok (struct upstream *up)
  449. {
  450. struct upstream_addr_elt *addr_elt;
  451. struct upstream_list_watcher *w;
  452. RSPAMD_UPSTREAM_LOCK (up->lock);
  453. if (up->errors > 0 && up->active_idx != -1) {
  454. /* We touch upstream if and only if it is active */
  455. up->errors = 0;
  456. if (up->addrs.addr) {
  457. addr_elt = g_ptr_array_index (up->addrs.addr, up->addrs.cur);
  458. addr_elt->errors = 0;
  459. }
  460. DL_FOREACH (up->ls->watchers, w) {
  461. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_SUCCESS) {
  462. w->func (up, RSPAMD_UPSTREAM_WATCH_SUCCESS, 0, w->ud);
  463. }
  464. }
  465. }
  466. RSPAMD_UPSTREAM_UNLOCK (up->lock);
  467. }
  468. void
  469. rspamd_upstream_set_weight (struct upstream *up, guint weight)
  470. {
  471. RSPAMD_UPSTREAM_LOCK (up->lock);
  472. up->weight = weight;
  473. RSPAMD_UPSTREAM_UNLOCK (up->lock);
  474. }
  475. #define SEED_CONSTANT 0xa574de7df64e9b9dULL
  476. struct upstream_list*
  477. rspamd_upstreams_create (struct upstream_ctx *ctx)
  478. {
  479. struct upstream_list *ls;
  480. ls = g_malloc0 (sizeof (*ls));
  481. ls->hash_seed = SEED_CONSTANT;
  482. ls->ups = g_ptr_array_new ();
  483. ls->alive = g_ptr_array_new ();
  484. ls->lock = rspamd_mutex_new ();
  485. ls->cur_elt = 0;
  486. ls->ctx = ctx;
  487. ls->rot_alg = RSPAMD_UPSTREAM_UNDEF;
  488. if (ctx) {
  489. ls->limits = ctx->limits;
  490. }
  491. else {
  492. ls->limits.error_time = default_error_time;
  493. ls->limits.max_errors = default_max_errors;
  494. ls->limits.dns_retransmits = default_dns_retransmits;
  495. ls->limits.dns_timeout = default_dns_timeout;
  496. ls->limits.revive_jitter = default_revive_jitter;
  497. ls->limits.revive_time = default_revive_time;
  498. }
  499. return ls;
  500. }
  501. gsize
  502. rspamd_upstreams_count (struct upstream_list *ups)
  503. {
  504. return ups != NULL ? ups->ups->len : 0;
  505. }
  506. gsize
  507. rspamd_upstreams_alive (struct upstream_list *ups)
  508. {
  509. return ups != NULL ? ups->alive->len : 0;
  510. }
  511. static void
  512. rspamd_upstream_dtor (struct upstream *up)
  513. {
  514. struct upstream_inet_addr_entry *cur, *tmp;
  515. if (up->new_addrs) {
  516. LL_FOREACH_SAFE(up->new_addrs, cur, tmp) {
  517. /* Here we need to free pointer as well */
  518. rspamd_inet_address_free (cur->addr);
  519. g_free (cur);
  520. }
  521. }
  522. if (up->addrs.addr) {
  523. g_ptr_array_free (up->addrs.addr, TRUE);
  524. }
  525. rspamd_mutex_free (up->lock);
  526. if (up->ctx) {
  527. g_queue_delete_link (up->ctx->upstreams, up->ctx_pos);
  528. REF_RELEASE (up->ctx);
  529. }
  530. g_free (up);
  531. }
  532. rspamd_inet_addr_t*
  533. rspamd_upstream_addr (struct upstream *up)
  534. {
  535. guint idx, next_idx;
  536. struct upstream_addr_elt *e1, *e2;
  537. do {
  538. idx = up->addrs.cur;
  539. next_idx = (idx + 1) % up->addrs.addr->len;
  540. e1 = g_ptr_array_index (up->addrs.addr, idx);
  541. e2 = g_ptr_array_index (up->addrs.addr, next_idx);
  542. up->addrs.cur = next_idx;
  543. } while (e2->errors > e1->errors);
  544. return e2->addr;
  545. }
  546. const gchar*
  547. rspamd_upstream_name (struct upstream *up)
  548. {
  549. return up->name;
  550. }
  551. gboolean
  552. rspamd_upstreams_add_upstream (struct upstream_list *ups, const gchar *str,
  553. guint16 def_port, enum rspamd_upstream_parse_type parse_type,
  554. void *data)
  555. {
  556. struct upstream *up;
  557. GPtrArray *addrs = NULL;
  558. guint i;
  559. rspamd_inet_addr_t *addr;
  560. gboolean ret = FALSE;
  561. up = g_malloc0 (sizeof (*up));
  562. switch (parse_type) {
  563. case RSPAMD_UPSTREAM_PARSE_DEFAULT:
  564. ret = rspamd_parse_host_port_priority (str, &addrs,
  565. &up->weight,
  566. &up->name, def_port, ups->ctx ? ups->ctx->pool : NULL);
  567. break;
  568. case RSPAMD_UPSTREAM_PARSE_NAMESERVER:
  569. addrs = g_ptr_array_sized_new (1);
  570. ret = rspamd_parse_inet_address (&addr, str, strlen (str));
  571. if (ups->ctx) {
  572. up->name = rspamd_mempool_strdup (ups->ctx->pool, str);
  573. }
  574. else {
  575. up->name = g_strdup (str);
  576. }
  577. if (ret) {
  578. if (rspamd_inet_address_get_port (addr) == 0) {
  579. rspamd_inet_address_set_port (addr, def_port);
  580. }
  581. g_ptr_array_add (addrs, addr);
  582. if (ups->ctx) {
  583. rspamd_mempool_add_destructor (ups->ctx->pool,
  584. (rspamd_mempool_destruct_t) rspamd_inet_address_free,
  585. addr);
  586. rspamd_mempool_add_destructor (ups->ctx->pool,
  587. (rspamd_mempool_destruct_t) rspamd_ptr_array_free_hard,
  588. addrs);
  589. }
  590. }
  591. else {
  592. g_ptr_array_free (addrs, TRUE);
  593. }
  594. break;
  595. }
  596. if (!ret) {
  597. g_free (up);
  598. return FALSE;
  599. }
  600. else {
  601. for (i = 0; i < addrs->len; i ++) {
  602. addr = g_ptr_array_index (addrs, i);
  603. rspamd_upstream_add_addr (up, rspamd_inet_address_copy (addr));
  604. }
  605. }
  606. if (up->weight == 0 && ups->rot_alg == RSPAMD_UPSTREAM_MASTER_SLAVE) {
  607. /* Special heuristic for master-slave rotation */
  608. if (ups->ups->len == 0) {
  609. /* Prioritize the first */
  610. up->weight = 1;
  611. }
  612. }
  613. g_ptr_array_add (ups->ups, up);
  614. up->ud = data;
  615. up->cur_weight = up->weight;
  616. up->ls = ups;
  617. REF_INIT_RETAIN (up, rspamd_upstream_dtor);
  618. up->lock = rspamd_mutex_new ();
  619. up->ctx = ups->ctx;
  620. if (up->ctx) {
  621. REF_RETAIN (ups->ctx);
  622. g_queue_push_tail (ups->ctx->upstreams, up);
  623. up->ctx_pos = g_queue_peek_tail_link (ups->ctx->upstreams);
  624. }
  625. g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
  626. rspamd_upstream_set_active (ups, up);
  627. return TRUE;
  628. }
  629. void
  630. rspamd_upstreams_set_flags (struct upstream_list *ups,
  631. enum rspamd_upstream_flag flags)
  632. {
  633. ups->flags = flags;
  634. }
  635. void
  636. rspamd_upstreams_set_rotation (struct upstream_list *ups,
  637. enum rspamd_upstream_rotation rot)
  638. {
  639. ups->rot_alg = rot;
  640. }
  641. gboolean
  642. rspamd_upstream_add_addr (struct upstream *up, rspamd_inet_addr_t *addr)
  643. {
  644. struct upstream_addr_elt *elt;
  645. /*
  646. * XXX: slow and inefficient
  647. */
  648. if (up->addrs.addr == NULL) {
  649. up->addrs.addr = g_ptr_array_new_full (8, rspamd_upstream_addr_elt_dtor);
  650. }
  651. elt = g_malloc0 (sizeof (*elt));
  652. elt->addr = addr;
  653. g_ptr_array_add (up->addrs.addr, elt);
  654. g_ptr_array_sort (up->addrs.addr, rspamd_upstream_addr_sort_func);
  655. return TRUE;
  656. }
  657. gboolean
  658. rspamd_upstreams_parse_line (struct upstream_list *ups,
  659. const gchar *str, guint16 def_port, void *data)
  660. {
  661. const gchar *end = str + strlen (str), *p = str;
  662. const gchar *separators = ";, \n\r\t";
  663. gchar *tmp;
  664. guint len;
  665. gboolean ret = FALSE;
  666. if (g_ascii_strncasecmp (p, "random:", sizeof ("random:") - 1) == 0) {
  667. ups->rot_alg = RSPAMD_UPSTREAM_RANDOM;
  668. p += sizeof ("random:") - 1;
  669. }
  670. else if (g_ascii_strncasecmp (p,
  671. "master-slave:",
  672. sizeof ("master-slave:") - 1) == 0) {
  673. ups->rot_alg = RSPAMD_UPSTREAM_MASTER_SLAVE;
  674. p += sizeof ("master-slave:") - 1;
  675. }
  676. else if (g_ascii_strncasecmp (p,
  677. "round-robin:",
  678. sizeof ("round-robin:") - 1) == 0) {
  679. ups->rot_alg = RSPAMD_UPSTREAM_ROUND_ROBIN;
  680. p += sizeof ("round-robin:") - 1;
  681. }
  682. else if (g_ascii_strncasecmp (p,
  683. "hash:",
  684. sizeof ("hash:") - 1) == 0) {
  685. ups->rot_alg = RSPAMD_UPSTREAM_HASHED;
  686. p += sizeof ("hash:") - 1;
  687. }
  688. else if (g_ascii_strncasecmp (p,
  689. "sequential:",
  690. sizeof ("sequential:") - 1) == 0) {
  691. ups->rot_alg = RSPAMD_UPSTREAM_SEQUENTIAL;
  692. p += sizeof ("sequential:") - 1;
  693. }
  694. while (p < end) {
  695. len = strcspn (p, separators);
  696. if (len > 0) {
  697. tmp = g_malloc (len + 1);
  698. rspamd_strlcpy (tmp, p, len + 1);
  699. if (rspamd_upstreams_add_upstream (ups, tmp, def_port,
  700. RSPAMD_UPSTREAM_PARSE_DEFAULT,
  701. data)) {
  702. ret = TRUE;
  703. }
  704. g_free (tmp);
  705. }
  706. p += len;
  707. /* Skip separators */
  708. p += strspn (p, separators);
  709. }
  710. return ret;
  711. }
  712. gboolean
  713. rspamd_upstreams_from_ucl (struct upstream_list *ups,
  714. const ucl_object_t *in, guint16 def_port, void *data)
  715. {
  716. gboolean ret = FALSE;
  717. const ucl_object_t *cur;
  718. ucl_object_iter_t it = NULL;
  719. it = ucl_object_iterate_new (in);
  720. while ((cur = ucl_object_iterate_safe (it, true)) != NULL) {
  721. if (ucl_object_type (cur) == UCL_STRING) {
  722. ret = rspamd_upstreams_parse_line (ups, ucl_object_tostring (cur),
  723. def_port, data);
  724. }
  725. }
  726. ucl_object_iterate_free (it);
  727. return ret;
  728. }
  729. void
  730. rspamd_upstreams_destroy (struct upstream_list *ups)
  731. {
  732. guint i;
  733. struct upstream *up;
  734. struct upstream_list_watcher *w, *tmp;
  735. if (ups != NULL) {
  736. g_ptr_array_free (ups->alive, TRUE);
  737. for (i = 0; i < ups->ups->len; i ++) {
  738. up = g_ptr_array_index (ups->ups, i);
  739. up->ls = NULL;
  740. REF_RELEASE (up);
  741. }
  742. DL_FOREACH_SAFE (ups->watchers, w, tmp) {
  743. if (w->dtor) {
  744. w->dtor (w->ud);
  745. }
  746. g_free (w);
  747. }
  748. g_ptr_array_free (ups->ups, TRUE);
  749. rspamd_mutex_free (ups->lock);
  750. g_free (ups);
  751. }
  752. }
  753. static void
  754. rspamd_upstream_restore_cb (gpointer elt, gpointer ls)
  755. {
  756. struct upstream *up = (struct upstream *)elt;
  757. struct upstream_list *ups = (struct upstream_list *)ls;
  758. struct upstream_list_watcher *w;
  759. /* Here the upstreams list is already locked */
  760. RSPAMD_UPSTREAM_LOCK (up->lock);
  761. if (rspamd_event_pending (&up->ev, EV_TIMEOUT)) {
  762. event_del (&up->ev);
  763. }
  764. g_ptr_array_add (ups->alive, up);
  765. up->active_idx = ups->alive->len - 1;
  766. RSPAMD_UPSTREAM_UNLOCK (up->lock);
  767. DL_FOREACH (up->ls->watchers, w) {
  768. if (w->events_mask & RSPAMD_UPSTREAM_WATCH_ONLINE) {
  769. w->func (up, RSPAMD_UPSTREAM_WATCH_ONLINE, up->errors, w->ud);
  770. }
  771. }
  772. /* For revive event */
  773. REF_RELEASE (up);
  774. }
  775. static struct upstream*
  776. rspamd_upstream_get_random (struct upstream_list *ups)
  777. {
  778. guint idx = ottery_rand_range (ups->alive->len - 1);
  779. return g_ptr_array_index (ups->alive, idx);
  780. }
  781. static struct upstream*
  782. rspamd_upstream_get_round_robin (struct upstream_list *ups, gboolean use_cur)
  783. {
  784. guint max_weight = 0, min_checked = G_MAXUINT;
  785. struct upstream *up, *selected = NULL, *min_checked_sel = NULL;
  786. guint i;
  787. /* Select upstream with the maximum cur_weight */
  788. RSPAMD_UPSTREAM_LOCK (ups->lock);
  789. for (i = 0; i < ups->alive->len; i ++) {
  790. up = g_ptr_array_index (ups->alive, i);
  791. if (use_cur) {
  792. if (up->cur_weight > max_weight) {
  793. selected = up;
  794. max_weight = up->cur_weight;
  795. }
  796. }
  797. else {
  798. if (up->weight > max_weight) {
  799. selected = up;
  800. max_weight = up->weight;
  801. }
  802. }
  803. if (up->checked * (up->errors + 1) < min_checked) {
  804. min_checked_sel = up;
  805. min_checked = up->checked;
  806. }
  807. }
  808. if (max_weight == 0) {
  809. if (min_checked > G_MAXUINT / 2) {
  810. /* Reset all checked counters to avoid overflow */
  811. for (i = 0; i < ups->alive->len; i ++) {
  812. up = g_ptr_array_index (ups->alive, i);
  813. up->checked = 0;
  814. }
  815. }
  816. selected = min_checked_sel;
  817. }
  818. if (use_cur && selected) {
  819. if (selected->cur_weight > 0) {
  820. selected->cur_weight--;
  821. }
  822. else {
  823. selected->cur_weight = selected->weight;
  824. }
  825. }
  826. RSPAMD_UPSTREAM_UNLOCK (ups->lock);
  827. return selected;
  828. }
  829. /*
  830. * The key idea of this function is obtained from the following paper:
  831. * A Fast, Minimal Memory, Consistent Hash Algorithm
  832. * John Lamping, Eric Veach
  833. *
  834. * http://arxiv.org/abs/1406.2294
  835. */
  836. static guint32
  837. rspamd_consistent_hash (guint64 key, guint32 nbuckets)
  838. {
  839. gint64 b = -1, j = 0;
  840. while (j < nbuckets) {
  841. b = j;
  842. key *= 2862933555777941757ULL + 1;
  843. j = (b + 1) * (double)(1ULL << 31) / (double)((key >> 33) + 1ULL);
  844. }
  845. return b;
  846. }
  847. static struct upstream*
  848. rspamd_upstream_get_hashed (struct upstream_list *ups, const guint8 *key, guint keylen)
  849. {
  850. guint64 k;
  851. guint32 idx;
  852. /* Generate 64 bits input key */
  853. k = rspamd_cryptobox_fast_hash_specific (RSPAMD_CRYPTOBOX_XXHASH64,
  854. key, keylen, ups->hash_seed);
  855. RSPAMD_UPSTREAM_LOCK (ups->lock);
  856. idx = rspamd_consistent_hash (k, ups->alive->len);
  857. RSPAMD_UPSTREAM_UNLOCK (ups->lock);
  858. return g_ptr_array_index (ups->alive, idx);
  859. }
  860. static struct upstream*
  861. rspamd_upstream_get_common (struct upstream_list *ups,
  862. enum rspamd_upstream_rotation default_type,
  863. const guchar *key, gsize keylen, gboolean forced)
  864. {
  865. enum rspamd_upstream_rotation type;
  866. struct upstream *up = NULL;
  867. RSPAMD_UPSTREAM_LOCK (ups->lock);
  868. if (ups->alive->len == 0) {
  869. /* We have no upstreams alive */
  870. g_ptr_array_foreach (ups->ups, rspamd_upstream_restore_cb, ups);
  871. }
  872. RSPAMD_UPSTREAM_UNLOCK (ups->lock);
  873. if (!forced) {
  874. type = ups->rot_alg != RSPAMD_UPSTREAM_UNDEF ? ups->rot_alg : default_type;
  875. }
  876. else {
  877. type = default_type != RSPAMD_UPSTREAM_UNDEF ? default_type : ups->rot_alg;
  878. }
  879. if (type == RSPAMD_UPSTREAM_HASHED && (keylen == 0 || key == NULL)) {
  880. /* Cannot use hashed rotation when no key is specified, switch to random */
  881. type = RSPAMD_UPSTREAM_RANDOM;
  882. }
  883. switch (type) {
  884. default:
  885. case RSPAMD_UPSTREAM_RANDOM:
  886. up = rspamd_upstream_get_random (ups);
  887. break;
  888. case RSPAMD_UPSTREAM_HASHED:
  889. up = rspamd_upstream_get_hashed (ups, key, keylen);
  890. break;
  891. case RSPAMD_UPSTREAM_ROUND_ROBIN:
  892. up = rspamd_upstream_get_round_robin (ups, TRUE);
  893. break;
  894. case RSPAMD_UPSTREAM_MASTER_SLAVE:
  895. up = rspamd_upstream_get_round_robin (ups, FALSE);
  896. break;
  897. case RSPAMD_UPSTREAM_SEQUENTIAL:
  898. if (ups->cur_elt >= ups->alive->len) {
  899. ups->cur_elt = 0;
  900. return NULL;
  901. }
  902. up = g_ptr_array_index (ups->alive, ups->cur_elt ++);
  903. break;
  904. }
  905. if (up) {
  906. up->checked ++;
  907. }
  908. return up;
  909. }
  910. struct upstream*
  911. rspamd_upstream_get (struct upstream_list *ups,
  912. enum rspamd_upstream_rotation default_type,
  913. const guchar *key, gsize keylen)
  914. {
  915. return rspamd_upstream_get_common (ups, default_type, key, keylen, FALSE);
  916. }
  917. struct upstream*
  918. rspamd_upstream_get_forced (struct upstream_list *ups,
  919. enum rspamd_upstream_rotation forced_type,
  920. const guchar *key, gsize keylen)
  921. {
  922. return rspamd_upstream_get_common (ups, forced_type, key, keylen, TRUE);
  923. }
  924. void
  925. rspamd_upstream_reresolve (struct upstream_ctx *ctx)
  926. {
  927. GList *cur;
  928. struct upstream *up;
  929. cur = ctx->upstreams->head;
  930. while (cur) {
  931. up = cur->data;
  932. REF_RETAIN (up);
  933. rspamd_upstream_resolve_addrs (up->ls, up);
  934. REF_RELEASE (up);
  935. cur = g_list_next (cur);
  936. }
  937. }
  938. gpointer
  939. rspamd_upstream_set_data (struct upstream *up, gpointer data)
  940. {
  941. gpointer prev_data = up->data;
  942. up->data = data;
  943. return prev_data;
  944. }
  945. gpointer
  946. rspamd_upstream_get_data (struct upstream *up)
  947. {
  948. return up->data;
  949. }
  950. void
  951. rspamd_upstreams_foreach (struct upstream_list *ups,
  952. rspamd_upstream_traverse_func cb, void *ud)
  953. {
  954. struct upstream *up;
  955. guint i;
  956. for (i = 0; i < ups->ups->len; i ++) {
  957. up = g_ptr_array_index (ups->ups, i);
  958. cb (up, i, ud);
  959. }
  960. }
  961. void
  962. rspamd_upstreams_set_limits (struct upstream_list *ups,
  963. gdouble revive_time,
  964. gdouble revive_jitter,
  965. gdouble error_time,
  966. gdouble dns_timeout,
  967. guint max_errors,
  968. guint dns_retransmits)
  969. {
  970. g_assert (ups != NULL);
  971. if (!isnan (revive_time)) {
  972. ups->limits.revive_time = revive_time;
  973. }
  974. if (!isnan (revive_jitter)) {
  975. ups->limits.revive_jitter = revive_jitter;
  976. }
  977. if (!isnan (error_time)) {
  978. ups->limits.error_time = error_time;
  979. }
  980. if (!isnan (dns_timeout)) {
  981. ups->limits.dns_timeout = dns_timeout;
  982. }
  983. if (max_errors > 0) {
  984. ups->limits.max_errors = max_errors;
  985. }
  986. if (dns_retransmits > 0) {
  987. ups->limits.dns_retransmits = dns_retransmits;
  988. }
  989. }
  990. void rspamd_upstreams_add_watch_callback (struct upstream_list *ups,
  991. enum rspamd_upstreams_watch_event events,
  992. rspamd_upstream_watch_func func,
  993. GFreeFunc dtor,
  994. gpointer ud)
  995. {
  996. struct upstream_list_watcher *nw;
  997. g_assert ((events & RSPAMD_UPSTREAM_WATCH_ALL) != 0);
  998. nw = g_malloc (sizeof (*nw));
  999. nw->func = func;
  1000. nw->events_mask = events;
  1001. nw->ud = ud;
  1002. nw->dtor = dtor;
  1003. DL_APPEND (ups->watchers, nw);
  1004. }