You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

16 years ago
16 years ago
9 years ago
9 years ago
9 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
12 years ago
16 years ago
16 years ago
16 years ago
16 years ago
16 years ago
16 years ago
16 years ago
16 years ago
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730
  1. /*
  2. * Copyright 2024 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "blas-config.h"
  18. #include "rspamd.h"
  19. #include "libserver/maps/map.h"
  20. #include "lua/lua_common.h"
  21. #include "libserver/worker_util.h"
  22. #include "libserver/rspamd_control.h"
  23. #include "ottery.h"
  24. #include "cryptobox.h"
  25. #include "utlist.h"
  26. #include "unix-std.h"
  27. /* pwd and grp */
  28. #ifdef HAVE_PWD_H
  29. #include <pwd.h>
  30. #endif
  31. #ifdef HAVE_GRP_H
  32. #include <grp.h>
  33. #endif
  34. #ifdef HAVE_NFTW
  35. #include <ftw.h>
  36. #endif
  37. #include <signal.h>
  38. #ifdef HAVE_SYS_RESOURCE_H
  39. #include <sys/resource.h>
  40. #endif
  41. #ifdef HAVE_LIBUTIL_H
  42. #include <libutil.h>
  43. #endif
  44. #ifdef HAVE_OPENSSL
  45. #include <openssl/err.h>
  46. #include <openssl/evp.h>
  47. #include <math.h>
  48. #endif
  49. #include "sqlite3.h"
  50. #include "contrib/libev/ev.h"
  51. #ifdef WITH_HYPERSCAN
  52. #include "libserver/hyperscan_tools.h"
  53. #endif
  54. /* 2 seconds to fork new process in place of dead one */
  55. #define SOFT_FORK_TIME 2
  56. /* 10 seconds after getting termination signal to terminate all workers with SIGKILL */
  57. #define TERMINATION_INTERVAL (0.2)
  58. static gboolean load_rspamd_config(struct rspamd_main *rspamd_main,
  59. struct rspamd_config *cfg,
  60. gboolean init_modules,
  61. enum rspamd_post_load_options opts,
  62. gboolean reload);
  63. static void rspamd_cld_handler(EV_P_ ev_child *w,
  64. struct rspamd_main *rspamd_main,
  65. struct rspamd_worker *wrk);
  66. /* Control socket */
  67. static int control_fd;
  68. static ev_io control_ev;
  69. static struct rspamd_stat old_stat;
  70. static ev_timer stat_ev;
  71. static gboolean valgrind_mode = FALSE;
  72. /* Cmdline options */
  73. static gboolean no_fork = FALSE;
  74. static gboolean show_version = FALSE;
  75. static char **cfg_names = NULL;
  76. static char *rspamd_user = NULL;
  77. static char *rspamd_group = NULL;
  78. static char *rspamd_pidfile = NULL;
  79. static gboolean is_debug = FALSE;
  80. static gboolean is_insecure = FALSE;
  81. static GHashTable *ucl_vars = NULL;
  82. static char **lua_env = NULL;
  83. static gboolean skip_template = FALSE;
  84. static int term_attempts = 0;
  85. /* List of active listen sockets indexed by worker type */
  86. static GHashTable *listen_sockets = NULL;
  87. /* Defined in modules.c */
  88. extern module_t *modules[];
  89. extern worker_t *workers[];
  90. /* Command line options */
  91. static gboolean rspamd_parse_var(const char *option_name,
  92. const char *value, gpointer data,
  93. GError **error);
  94. static GOptionEntry entries[] =
  95. {
  96. {"no-fork", 'f', 0, G_OPTION_ARG_NONE, &no_fork,
  97. "Do not daemonize main process", NULL},
  98. {"config", 'c', 0, G_OPTION_ARG_FILENAME_ARRAY, &cfg_names,
  99. "Specify config file(s)", NULL},
  100. {"user", 'u', 0, G_OPTION_ARG_STRING, &rspamd_user,
  101. "User to run rspamd as", NULL},
  102. {"group", 'g', 0, G_OPTION_ARG_STRING, &rspamd_group,
  103. "Group to run rspamd as", NULL},
  104. {"pid", 'p', 0, G_OPTION_ARG_STRING, &rspamd_pidfile, "Path to pidfile",
  105. NULL},
  106. {"debug", 'd', 0, G_OPTION_ARG_NONE, &is_debug, "Force debug output",
  107. NULL},
  108. {"insecure", 'i', 0, G_OPTION_ARG_NONE, &is_insecure,
  109. "Ignore running workers as privileged users (insecure)", NULL},
  110. {"version", 'v', 0, G_OPTION_ARG_NONE, &show_version,
  111. "Show version and exit", NULL},
  112. {"var", 0, 0, G_OPTION_ARG_CALLBACK, (gpointer) &rspamd_parse_var,
  113. "Redefine/define environment variable", NULL},
  114. {"skip-template", 'T', 0, G_OPTION_ARG_NONE, &skip_template,
  115. "Do not apply Jinja templates", NULL},
  116. {"lua-env", '\0', 0, G_OPTION_ARG_FILENAME_ARRAY, &lua_env,
  117. "Load lua environment from the specified files", NULL},
  118. {NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL}};
  119. static gboolean
  120. rspamd_parse_var(const char *option_name,
  121. const char *value, gpointer data,
  122. GError **error)
  123. {
  124. char *k, *v, *t;
  125. t = strchr(value, '=');
  126. if (t != NULL) {
  127. k = g_strdup(value);
  128. t = k + (t - value);
  129. v = g_strdup(t + 1);
  130. *t = '\0';
  131. if (ucl_vars == NULL) {
  132. ucl_vars = g_hash_table_new_full(rspamd_strcase_hash,
  133. rspamd_strcase_equal, g_free, g_free);
  134. }
  135. g_hash_table_insert(ucl_vars, k, v);
  136. }
  137. else {
  138. g_set_error(error, g_quark_try_string("main"), EINVAL,
  139. "Bad variable format: %s", value);
  140. return FALSE;
  141. }
  142. return TRUE;
  143. }
  144. static void
  145. read_cmd_line(int *argc, char ***argv, struct rspamd_config *cfg)
  146. {
  147. GError *error = NULL;
  148. GOptionContext *context;
  149. unsigned int cfg_num;
  150. context = g_option_context_new("- run rspamd daemon");
  151. #if defined(GIT_VERSION) && GIT_VERSION == 1
  152. g_option_context_set_summary(context,
  153. "Summary:\n Rspamd daemon version " RVERSION "-git\n Git id: " RID);
  154. #else
  155. g_option_context_set_summary(context,
  156. "Summary:\n Rspamd daemon version " RVERSION);
  157. #endif
  158. g_option_context_add_main_entries(context, entries, NULL);
  159. if (!g_option_context_parse(context, argc, argv, &error)) {
  160. fprintf(stderr, "option parsing failed: %s\n", error->message);
  161. g_option_context_free(context);
  162. exit(EXIT_FAILURE);
  163. }
  164. cfg->rspamd_user = rspamd_user;
  165. cfg->rspamd_group = rspamd_group;
  166. cfg_num = cfg_names != NULL ? g_strv_length(cfg_names) : 0;
  167. if (cfg_num == 0) {
  168. cfg->cfg_name = FIXED_CONFIG_FILE;
  169. }
  170. else {
  171. cfg->cfg_name = cfg_names[0];
  172. g_assert(cfg_num == 1);
  173. }
  174. cfg->pid_file = rspamd_pidfile;
  175. g_option_context_free(context);
  176. }
  177. static int
  178. rspamd_write_pid(struct rspamd_main *main)
  179. {
  180. pid_t pid;
  181. if (main->cfg->pid_file == NULL) {
  182. return -1;
  183. }
  184. main->pfh = rspamd_pidfile_open(main->cfg->pid_file, 0644, &pid);
  185. if (main->pfh == NULL) {
  186. return -1;
  187. }
  188. if (main->is_privileged) {
  189. /* Force root user as owner of pid file */
  190. #ifdef HAVE_PIDFILE_FILENO
  191. if (fchown(pidfile_fileno(main->pfh), 0, 0) == -1) {
  192. #else
  193. if (fchown(main->pfh->pf_fd, 0, 0) == -1) {
  194. #endif
  195. }
  196. }
  197. rspamd_pidfile_write(main->pfh);
  198. return 0;
  199. }
  200. /* Detect privileged mode */
  201. static void
  202. detect_priv(struct rspamd_main *rspamd_main)
  203. {
  204. struct passwd *pwd;
  205. struct group *grp;
  206. uid_t euid;
  207. euid = geteuid();
  208. if (euid == 0) {
  209. if (!rspamd_main->cfg->rspamd_user && !is_insecure) {
  210. msg_err_main(
  211. "cannot run rspamd workers as root user, please add -u and -g options to select a proper unprivileged user or specify --insecure flag");
  212. exit(EXIT_FAILURE);
  213. }
  214. else if (is_insecure) {
  215. rspamd_main->is_privileged = TRUE;
  216. rspamd_main->workers_uid = 0;
  217. rspamd_main->workers_gid = 0;
  218. }
  219. else {
  220. rspamd_main->is_privileged = TRUE;
  221. pwd = getpwnam(rspamd_main->cfg->rspamd_user);
  222. if (pwd == NULL) {
  223. msg_err_main("user specified does not exists (%s), aborting",
  224. strerror(errno));
  225. exit(-errno);
  226. }
  227. if (rspamd_main->cfg->rspamd_group) {
  228. grp = getgrnam(rspamd_main->cfg->rspamd_group);
  229. if (grp == NULL) {
  230. msg_err_main("group specified does not exists (%s), aborting",
  231. strerror(errno));
  232. exit(-errno);
  233. }
  234. rspamd_main->workers_gid = grp->gr_gid;
  235. }
  236. else {
  237. /* Use the main group of user */
  238. rspamd_main->workers_gid = pwd->pw_gid;
  239. }
  240. rspamd_main->workers_uid = pwd->pw_uid;
  241. }
  242. }
  243. else {
  244. rspamd_main->is_privileged = FALSE;
  245. rspamd_main->workers_uid = (uid_t) -1;
  246. rspamd_main->workers_gid = (gid_t) -1;
  247. }
  248. }
  249. static void
  250. config_logger(rspamd_mempool_t *pool, gpointer ud)
  251. {
  252. struct rspamd_main *rspamd_main = ud;
  253. rspamd_main->logger = rspamd_log_open_specific(rspamd_main->server_pool,
  254. rspamd_main->cfg,
  255. "main",
  256. rspamd_main->workers_uid,
  257. rspamd_main->workers_gid);
  258. if (rspamd_main->logger == NULL) {
  259. /*
  260. * XXX:
  261. * Error has been already logged (in fact,
  262. * we might fall back to console logger here)
  263. */
  264. exit(EXIT_FAILURE);
  265. }
  266. rspamd_logger_configure_modules(rspamd_main->cfg->debug_modules);
  267. }
  268. static gboolean
  269. reread_config(struct rspamd_main *rspamd_main)
  270. {
  271. struct rspamd_config *tmp_cfg, *old_cfg;
  272. char *cfg_file;
  273. int load_opts = RSPAMD_CONFIG_INIT_VALIDATE | RSPAMD_CONFIG_INIT_SYMCACHE |
  274. RSPAMD_CONFIG_INIT_LIBS | RSPAMD_CONFIG_INIT_URL;
  275. rspamd_symcache_save(rspamd_main->cfg->cache);
  276. tmp_cfg = rspamd_config_new(RSPAMD_CONFIG_INIT_DEFAULT);
  277. tmp_cfg->libs_ctx = rspamd_main->cfg->libs_ctx;
  278. REF_RETAIN(tmp_cfg->libs_ctx);
  279. cfg_file = rspamd_mempool_strdup(tmp_cfg->cfg_pool,
  280. rspamd_main->cfg->cfg_name);
  281. /* Save some variables */
  282. tmp_cfg->cfg_name = cfg_file;
  283. old_cfg = rspamd_main->cfg;
  284. rspamd_main->cfg = tmp_cfg;
  285. rspamd_logger_t *old_logger = rspamd_main->logger;
  286. if (!load_rspamd_config(rspamd_main, tmp_cfg, TRUE, load_opts, TRUE)) {
  287. rspamd_main->cfg = old_cfg;
  288. rspamd_main->logger = old_logger;
  289. msg_err_main("cannot parse new config file, revert to old one");
  290. REF_RELEASE(tmp_cfg);
  291. return FALSE;
  292. }
  293. else {
  294. rspamd_log_close(old_logger);
  295. msg_info_main("replacing config");
  296. REF_RELEASE(old_cfg);
  297. rspamd_main->cfg->rspamd_user = rspamd_user;
  298. rspamd_main->cfg->rspamd_group = rspamd_group;
  299. /* Here, we can do post actions with the existing config */
  300. /*
  301. * As some rules are defined in lua, we need to process them, then init
  302. * modules and merely afterwards to init modules
  303. */
  304. rspamd_lua_post_load_config(tmp_cfg);
  305. rspamd_init_filters(tmp_cfg, true, false);
  306. /* Do post-load actions */
  307. rspamd_config_post_load(tmp_cfg,
  308. load_opts | RSPAMD_CONFIG_INIT_POST_LOAD_LUA | RSPAMD_CONFIG_INIT_PRELOAD_MAPS);
  309. msg_info_main("config has been reread successfully");
  310. }
  311. return TRUE;
  312. }
  313. struct waiting_worker {
  314. struct rspamd_main *rspamd_main;
  315. struct ev_timer wait_ev;
  316. struct rspamd_worker_conf *cf;
  317. unsigned int oldindex;
  318. };
  319. static void
  320. rspamd_fork_delayed_cb(EV_P_ ev_timer *w, int revents)
  321. {
  322. struct waiting_worker *waiting_worker = (struct waiting_worker *) w->data;
  323. ev_timer_stop(EV_A_ & waiting_worker->wait_ev);
  324. rspamd_fork_worker(waiting_worker->rspamd_main, waiting_worker->cf,
  325. waiting_worker->oldindex,
  326. waiting_worker->rspamd_main->event_loop,
  327. rspamd_cld_handler, listen_sockets);
  328. REF_RELEASE(waiting_worker->cf);
  329. g_free(waiting_worker);
  330. }
  331. static void
  332. rspamd_fork_delayed(struct rspamd_worker_conf *cf,
  333. unsigned int index,
  334. struct rspamd_main *rspamd_main)
  335. {
  336. struct waiting_worker *nw;
  337. nw = g_malloc0(sizeof(*nw));
  338. nw->cf = cf;
  339. nw->oldindex = index;
  340. nw->rspamd_main = rspamd_main;
  341. REF_RETAIN(cf);
  342. nw->wait_ev.data = nw;
  343. ev_timer_init(&nw->wait_ev, rspamd_fork_delayed_cb, SOFT_FORK_TIME, 0.0);
  344. ev_timer_start(rspamd_main->event_loop, &nw->wait_ev);
  345. }
  346. static GList *
  347. create_listen_socket(GPtrArray *addrs, unsigned int cnt,
  348. enum rspamd_worker_socket_type listen_type)
  349. {
  350. GList *result = NULL;
  351. int fd;
  352. unsigned int i;
  353. static const int listen_opts = RSPAMD_INET_ADDRESS_LISTEN_ASYNC;
  354. struct rspamd_worker_listen_socket *ls;
  355. g_ptr_array_sort(addrs, rspamd_inet_address_compare_ptr);
  356. for (i = 0; i < cnt; i++) {
  357. /*
  358. * Copy address to avoid reload issues
  359. */
  360. if (listen_type & RSPAMD_WORKER_SOCKET_TCP) {
  361. fd = rspamd_inet_address_listen(g_ptr_array_index(addrs, i),
  362. SOCK_STREAM,
  363. listen_opts, -1);
  364. if (fd != -1) {
  365. ls = g_malloc0(sizeof(*ls));
  366. ls->addr = rspamd_inet_address_copy(g_ptr_array_index(addrs, i), NULL);
  367. ls->fd = fd;
  368. ls->type = RSPAMD_WORKER_SOCKET_TCP;
  369. result = g_list_prepend(result, ls);
  370. }
  371. }
  372. if (listen_type & RSPAMD_WORKER_SOCKET_UDP) {
  373. fd = rspamd_inet_address_listen(g_ptr_array_index(addrs, i),
  374. SOCK_DGRAM,
  375. listen_opts | RSPAMD_INET_ADDRESS_LISTEN_REUSEPORT, -1);
  376. if (fd != -1) {
  377. ls = g_malloc0(sizeof(*ls));
  378. ls->addr = rspamd_inet_address_copy(g_ptr_array_index(addrs, i), NULL);
  379. ls->fd = fd;
  380. ls->type = RSPAMD_WORKER_SOCKET_UDP;
  381. result = g_list_prepend(result, ls);
  382. }
  383. }
  384. }
  385. return result;
  386. }
  387. static GList *
  388. systemd_get_socket(struct rspamd_main *rspamd_main, const char *fdname)
  389. {
  390. int number, sock, num_passed, flags;
  391. GList *result = NULL;
  392. const char *e;
  393. char **fdnames;
  394. char *end;
  395. struct stat st;
  396. static const int sd_listen_fds_start = 3; /* SD_LISTEN_FDS_START */
  397. struct rspamd_worker_listen_socket *ls;
  398. union {
  399. struct sockaddr_storage ss;
  400. struct sockaddr sa;
  401. struct sockaddr_un sun;
  402. struct sockaddr_in6 s6;
  403. } addr_storage;
  404. socklen_t slen = sizeof(addr_storage);
  405. int stype;
  406. number = strtoul(fdname, &end, 10);
  407. if (end != NULL && *end != '\0') {
  408. /* Cannot parse as number, assume a name in LISTEN_FDNAMES. */
  409. e = getenv("LISTEN_FDNAMES");
  410. if (!e) {
  411. msg_err_main("cannot get systemd variable 'LISTEN_FDNAMES'");
  412. errno = ENOENT;
  413. return NULL;
  414. }
  415. fdnames = g_strsplit(e, ":", -1);
  416. for (number = 0; fdnames[number]; number++) {
  417. if (!strcmp(fdnames[number], fdname)) {
  418. break;
  419. }
  420. }
  421. if (!fdnames[number]) {
  422. number = -1;
  423. }
  424. g_strfreev(fdnames);
  425. }
  426. if (number < 0) {
  427. msg_warn_main("cannot find systemd socket: %s", fdname);
  428. errno = ENOENT;
  429. return NULL;
  430. }
  431. e = getenv("LISTEN_FDS");
  432. if (e != NULL) {
  433. errno = 0;
  434. num_passed = strtoul(e, &end, 10);
  435. if ((end == NULL || *end == '\0') && num_passed > number) {
  436. sock = number + sd_listen_fds_start;
  437. if (fstat(sock, &st) == -1) {
  438. msg_warn_main("cannot stat systemd descriptor %d", sock);
  439. return NULL;
  440. }
  441. if (!S_ISSOCK(st.st_mode)) {
  442. msg_warn_main("systemd descriptor %d is not a socket", sock);
  443. errno = EINVAL;
  444. return NULL;
  445. }
  446. flags = fcntl(sock, F_GETFD);
  447. if (flags != -1) {
  448. (void) fcntl(sock, F_SETFD, flags | FD_CLOEXEC);
  449. }
  450. rspamd_socket_nonblocking(sock);
  451. if (getsockname(sock, &addr_storage.sa, &slen) == -1) {
  452. msg_warn_main("cannot get name for systemd descriptor %d: %s",
  453. sock, strerror(errno));
  454. errno = EINVAL;
  455. return NULL;
  456. }
  457. ls = g_malloc0(sizeof(*ls));
  458. ls->addr = rspamd_inet_address_from_sa(&addr_storage.sa, slen);
  459. ls->fd = sock;
  460. ls->is_systemd = true;
  461. slen = sizeof(stype);
  462. if (getsockopt(sock, SOL_SOCKET, SO_TYPE, &stype, &slen) != -1) {
  463. if (stype == SOCK_STREAM) {
  464. ls->type = RSPAMD_WORKER_SOCKET_TCP;
  465. }
  466. else {
  467. ls->type = RSPAMD_WORKER_SOCKET_UDP;
  468. }
  469. }
  470. else {
  471. msg_warn_main("cannot get type for systemd descriptor %d: %s",
  472. sock, strerror(errno));
  473. ls->type = RSPAMD_WORKER_SOCKET_TCP;
  474. }
  475. result = g_list_prepend(result, ls);
  476. }
  477. else if (num_passed <= number) {
  478. msg_err_main("systemd LISTEN_FDS does not contain the expected fd: %d",
  479. num_passed);
  480. errno = EINVAL;
  481. }
  482. }
  483. else {
  484. msg_err_main("cannot get systemd variable 'LISTEN_FDS'");
  485. errno = ENOENT;
  486. }
  487. return result;
  488. }
  489. static void
  490. pass_signal_cb(gpointer key, gpointer value, gpointer ud)
  491. {
  492. struct rspamd_worker *cur = value;
  493. int signo = GPOINTER_TO_INT(ud);
  494. kill(cur->pid, signo);
  495. }
  496. static void
  497. rspamd_pass_signal(GHashTable *workers, int signo)
  498. {
  499. g_hash_table_foreach(workers, pass_signal_cb, GINT_TO_POINTER(signo));
  500. }
  501. static inline uintptr_t
  502. make_listen_key(struct rspamd_worker_bind_conf *cf)
  503. {
  504. rspamd_cryptobox_fast_hash_state_t st;
  505. unsigned int i, keylen = 0;
  506. uint8_t *key;
  507. rspamd_inet_addr_t *addr;
  508. uint16_t port;
  509. rspamd_cryptobox_fast_hash_init(&st, rspamd_hash_seed());
  510. if (cf->is_systemd) {
  511. /* Something like 'systemd:0' or 'systemd:controller'. */
  512. rspamd_cryptobox_fast_hash_update(&st, cf->name, strlen(cf->name));
  513. }
  514. else {
  515. rspamd_cryptobox_fast_hash_update(&st, cf->name, strlen(cf->name));
  516. for (i = 0; i < cf->cnt; i++) {
  517. addr = g_ptr_array_index(cf->addrs, i);
  518. key = rspamd_inet_address_get_hash_key(
  519. addr, &keylen);
  520. rspamd_cryptobox_fast_hash_update(&st, key, keylen);
  521. port = rspamd_inet_address_get_port(addr);
  522. rspamd_cryptobox_fast_hash_update(&st, &port, sizeof(port));
  523. }
  524. }
  525. return rspamd_cryptobox_fast_hash_final(&st);
  526. }
  527. static void
  528. spawn_worker_type(struct rspamd_main *rspamd_main, struct ev_loop *event_loop,
  529. struct rspamd_worker_conf *cf)
  530. {
  531. int i;
  532. if (cf->count < 0) {
  533. msg_info_main("skip spawning of worker %s: disabled in configuration",
  534. cf->worker->name);
  535. return;
  536. }
  537. if (cf->worker->flags & RSPAMD_WORKER_UNIQUE) {
  538. if (cf->count > 1) {
  539. msg_warn_main(
  540. "cannot spawn more than 1 %s worker, so spawn one",
  541. cf->worker->name);
  542. }
  543. rspamd_fork_worker(rspamd_main, cf, 0, event_loop, rspamd_cld_handler,
  544. listen_sockets);
  545. }
  546. else if (cf->worker->flags & RSPAMD_WORKER_THREADED) {
  547. rspamd_fork_worker(rspamd_main, cf, 0, event_loop, rspamd_cld_handler,
  548. listen_sockets);
  549. }
  550. else {
  551. for (i = 0; i < cf->count; i++) {
  552. rspamd_fork_worker(rspamd_main, cf, i, event_loop,
  553. rspamd_cld_handler, listen_sockets);
  554. }
  555. }
  556. }
  557. static void
  558. spawn_workers(struct rspamd_main *rspamd_main, struct ev_loop *ev_base)
  559. {
  560. GList *cur, *ls;
  561. struct rspamd_worker_conf *cf;
  562. gpointer p;
  563. guintptr key;
  564. struct rspamd_worker_bind_conf *bcf;
  565. gboolean listen_ok = FALSE;
  566. GPtrArray *seen_mandatory_workers;
  567. worker_t **cw, *wrk;
  568. unsigned int i;
  569. /* Special hack for hs_helper if it's not defined in a config */
  570. seen_mandatory_workers = g_ptr_array_new();
  571. cur = rspamd_main->cfg->workers;
  572. while (cur) {
  573. cf = cur->data;
  574. listen_ok = FALSE;
  575. if (cf->worker == NULL) {
  576. msg_err_main("type of worker is unspecified, skip spawning");
  577. }
  578. else {
  579. if (!cf->enabled || cf->count <= 0) {
  580. msg_info_main("worker of type %s(%s) is disabled in the config, "
  581. "skip spawning",
  582. g_quark_to_string(cf->type),
  583. cf->bind_conf ? cf->bind_conf->name : "none");
  584. cur = g_list_next(cur);
  585. continue;
  586. }
  587. if (cf->worker->flags & RSPAMD_WORKER_ALWAYS_START) {
  588. g_ptr_array_add(seen_mandatory_workers, cf->worker);
  589. }
  590. if (cf->worker->flags & RSPAMD_WORKER_HAS_SOCKET) {
  591. LL_FOREACH(cf->bind_conf, bcf)
  592. {
  593. key = make_listen_key(bcf);
  594. if ((p =
  595. g_hash_table_lookup(listen_sockets,
  596. GINT_TO_POINTER(key))) == NULL) {
  597. if (!bcf->is_systemd) {
  598. /* Create listen socket */
  599. ls = create_listen_socket(bcf->addrs, bcf->cnt,
  600. cf->worker->listen_type);
  601. }
  602. else {
  603. ls = systemd_get_socket(rspamd_main,
  604. g_ptr_array_index(bcf->addrs, 0));
  605. }
  606. if (ls == NULL) {
  607. msg_err_main("cannot listen on %s socket %s: %s",
  608. bcf->is_systemd ? "systemd" : "normal",
  609. bcf->name,
  610. strerror(errno));
  611. }
  612. else {
  613. g_hash_table_insert(listen_sockets, (gpointer) key, ls);
  614. listen_ok = TRUE;
  615. }
  616. }
  617. else {
  618. /* We had socket for this type of worker */
  619. ls = p;
  620. listen_ok = TRUE;
  621. }
  622. /* Do not add existing lists as it causes loops */
  623. if (g_list_position(cf->listen_socks, ls) == -1) {
  624. cf->listen_socks = g_list_concat(cf->listen_socks, ls);
  625. }
  626. }
  627. if (listen_ok) {
  628. spawn_worker_type(rspamd_main, ev_base, cf);
  629. }
  630. else {
  631. if (cf->bind_conf == NULL) {
  632. msg_err_main("cannot create listen socket for %s",
  633. g_quark_to_string(cf->type));
  634. }
  635. else {
  636. msg_err_main("cannot create listen socket for %s at %s",
  637. g_quark_to_string(cf->type), cf->bind_conf->name);
  638. }
  639. rspamd_hard_terminate(rspamd_main);
  640. g_assert_not_reached();
  641. }
  642. }
  643. else {
  644. spawn_worker_type(rspamd_main, ev_base, cf);
  645. }
  646. }
  647. cur = g_list_next(cur);
  648. }
  649. for (cw = workers; *cw != NULL; cw++) {
  650. gboolean seen = FALSE;
  651. wrk = *cw;
  652. if (wrk->flags & RSPAMD_WORKER_ALWAYS_START) {
  653. for (i = 0; i < seen_mandatory_workers->len; i++) {
  654. if (wrk == g_ptr_array_index(seen_mandatory_workers, i)) {
  655. seen = TRUE;
  656. break;
  657. }
  658. }
  659. if (!seen) {
  660. cf = rspamd_config_new_worker(rspamd_main->cfg, NULL);
  661. cf->count = 1;
  662. cf->worker = wrk;
  663. cf->type = g_quark_from_static_string(wrk->name);
  664. if (cf->worker->worker_init_func) {
  665. cf->ctx = cf->worker->worker_init_func(rspamd_main->cfg);
  666. }
  667. spawn_worker_type(rspamd_main, ev_base, cf);
  668. }
  669. }
  670. }
  671. g_ptr_array_free(seen_mandatory_workers, TRUE);
  672. }
  673. static void
  674. kill_old_workers(gpointer key, gpointer value, gpointer unused)
  675. {
  676. struct rspamd_worker *w = value;
  677. struct rspamd_main *rspamd_main;
  678. rspamd_main = w->srv;
  679. if (w->state == rspamd_worker_state_wanna_die) {
  680. w->state = rspamd_worker_state_terminating;
  681. kill(w->pid, SIGUSR2);
  682. ev_io_stop(rspamd_main->event_loop, &w->srv_ev);
  683. g_hash_table_remove_all(w->control_events_pending);
  684. msg_info_main("send signal to worker %P", w->pid);
  685. }
  686. else if (w->state != rspamd_worker_state_running) {
  687. msg_info_main("do not send signal to worker %P, already sent", w->pid);
  688. }
  689. }
  690. static void
  691. mark_old_workers(gpointer key, gpointer value, gpointer unused)
  692. {
  693. struct rspamd_worker *w = value;
  694. if (w->state == rspamd_worker_state_running) {
  695. w->state = rspamd_worker_state_wanna_die;
  696. }
  697. w->flags |= RSPAMD_WORKER_OLD_CONFIG;
  698. }
  699. static void
  700. rspamd_worker_wait(struct rspamd_worker *w)
  701. {
  702. struct rspamd_main *rspamd_main;
  703. rspamd_main = w->srv;
  704. if (term_attempts < 0) {
  705. if (w->cf->worker->flags & RSPAMD_WORKER_KILLABLE) {
  706. if (kill(w->pid, SIGKILL) == -1) {
  707. if (errno == ESRCH) {
  708. /* We have actually killed the process */
  709. return;
  710. }
  711. }
  712. else {
  713. msg_warn_main("terminate worker %s(%P) with SIGKILL",
  714. g_quark_to_string(w->type), w->pid);
  715. }
  716. }
  717. else {
  718. kill(w->pid, SIGKILL);
  719. if (errno == ESRCH) {
  720. /* We have actually killed the process */
  721. return;
  722. }
  723. else {
  724. msg_err_main("data corruption warning: terminating "
  725. "special worker %s(%P) with SIGKILL",
  726. g_quark_to_string(w->type), w->pid);
  727. }
  728. }
  729. }
  730. }
  731. static void
  732. hash_worker_wait_callback(gpointer key, gpointer value, gpointer unused)
  733. {
  734. rspamd_worker_wait((struct rspamd_worker *) value);
  735. }
  736. struct core_check_cbdata {
  737. struct rspamd_config *cfg;
  738. gsize total_count;
  739. gsize total_size;
  740. };
  741. #ifdef HAVE_NFTW
  742. static struct core_check_cbdata cores_cbdata;
  743. static int
  744. rspamd_check_core_cb(const char *path, const struct stat *st,
  745. int flag, struct FTW *ft)
  746. {
  747. if (S_ISREG(st->st_mode)) {
  748. cores_cbdata.total_count++;
  749. /* Use physical size instead of displayed one */
  750. cores_cbdata.total_size += st->st_blocks * 512;
  751. }
  752. return 0;
  753. }
  754. #endif
  755. static void
  756. rspamd_check_core_limits(struct rspamd_main *rspamd_main)
  757. {
  758. #ifdef HAVE_NFTW
  759. struct rspamd_config *cfg = rspamd_main->cfg;
  760. cores_cbdata.cfg = cfg;
  761. cores_cbdata.total_count = 0;
  762. cores_cbdata.total_size = 0;
  763. if (cfg->cores_dir && (cfg->max_cores_count || cfg->max_cores_size)) {
  764. if (nftw(cfg->cores_dir, rspamd_check_core_cb, 1, FTW_MOUNT | FTW_PHYS) == -1) {
  765. msg_err_main("nftw failed for path %s: %s", cfg->cores_dir,
  766. strerror(errno));
  767. }
  768. else {
  769. if (!rspamd_main->cores_throttling) {
  770. if (cfg->max_cores_size &&
  771. cores_cbdata.total_size > cfg->max_cores_size) {
  772. msg_warn_main(
  773. "enable cores throttling as size of cores in"
  774. " %s is %Hz, limit is %Hz",
  775. cfg->cores_dir,
  776. cores_cbdata.total_size,
  777. cfg->max_cores_size);
  778. rspamd_main->cores_throttling = TRUE;
  779. }
  780. if (cfg->max_cores_count &&
  781. cores_cbdata.total_count > cfg->max_cores_count) {
  782. msg_warn_main(
  783. "enable cores throttling as count of cores in"
  784. " %s is %z, limit is %z",
  785. cfg->cores_dir,
  786. cores_cbdata.total_count,
  787. cfg->max_cores_count);
  788. rspamd_main->cores_throttling = TRUE;
  789. }
  790. }
  791. else {
  792. if (cfg->max_cores_size &&
  793. cores_cbdata.total_size < cfg->max_cores_size) {
  794. msg_info_main(
  795. "disable cores throttling as size of cores in"
  796. " %s is now %Hz, limit is %Hz",
  797. cfg->cores_dir,
  798. cores_cbdata.total_size,
  799. cfg->max_cores_size);
  800. rspamd_main->cores_throttling = FALSE;
  801. }
  802. if (cfg->max_cores_count &&
  803. cores_cbdata.total_count < cfg->max_cores_count) {
  804. msg_info_main(
  805. "disable cores throttling as count of cores in"
  806. " %s is %z, limit is %z",
  807. cfg->cores_dir,
  808. cores_cbdata.total_count,
  809. cfg->max_cores_count);
  810. rspamd_main->cores_throttling = FALSE;
  811. }
  812. }
  813. }
  814. }
  815. #endif
  816. }
  817. static void
  818. reopen_log_handler(gpointer key, gpointer value, gpointer unused)
  819. {
  820. struct rspamd_worker *w = value;
  821. struct rspamd_main *rspamd_main;
  822. rspamd_main = w->srv;
  823. if (kill(w->pid, SIGUSR1) == -1) {
  824. msg_err_main("kill failed for pid %P: %s", w->pid, strerror(errno));
  825. }
  826. }
  827. static gboolean
  828. load_rspamd_config(struct rspamd_main *rspamd_main,
  829. struct rspamd_config *cfg, gboolean init_modules,
  830. enum rspamd_post_load_options opts,
  831. gboolean reload)
  832. {
  833. cfg->compiled_modules = modules;
  834. cfg->compiled_workers = workers;
  835. if (!rspamd_config_read(cfg, cfg->cfg_name, config_logger, rspamd_main,
  836. ucl_vars, skip_template, lua_env)) {
  837. return FALSE;
  838. }
  839. /* Strictly set temp dir */
  840. if (!cfg->temp_dir) {
  841. msg_warn_main("tempdir is not set, trying to use $TMPDIR");
  842. cfg->temp_dir =
  843. rspamd_mempool_strdup(cfg->cfg_pool, getenv("TMPDIR"));
  844. if (!cfg->temp_dir) {
  845. msg_warn_main("$TMPDIR is empty too, using /tmp as default");
  846. cfg->temp_dir = rspamd_mempool_strdup(cfg->cfg_pool, "/tmp");
  847. }
  848. }
  849. if (!reload) {
  850. /*
  851. * As some rules are defined in lua, we need to process them, then init
  852. * modules and merely afterwards to init modules
  853. */
  854. rspamd_lua_post_load_config(cfg);
  855. if (init_modules) {
  856. if (!rspamd_init_filters(cfg, reload, false)) {
  857. return FALSE;
  858. }
  859. }
  860. /* Do post-load actions */
  861. if (!rspamd_config_post_load(cfg, opts)) {
  862. return FALSE;
  863. }
  864. }
  865. return TRUE;
  866. }
  867. static void
  868. rspamd_detach_worker(struct rspamd_main *rspamd_main, struct rspamd_worker *wrk)
  869. {
  870. ev_io_stop(rspamd_main->event_loop, &wrk->srv_ev);
  871. ev_timer_stop(rspamd_main->event_loop, &wrk->hb.heartbeat_ev);
  872. }
  873. static void
  874. rspamd_attach_worker(struct rspamd_main *rspamd_main, struct rspamd_worker *wrk)
  875. {
  876. ev_io_start(rspamd_main->event_loop, &wrk->srv_ev);
  877. ev_timer_start(rspamd_main->event_loop, &wrk->hb.heartbeat_ev);
  878. }
  879. static void
  880. stop_srv_ev(gpointer key, gpointer value, gpointer ud)
  881. {
  882. struct rspamd_worker *cur = (struct rspamd_worker *) value;
  883. struct rspamd_main *rspamd_main = (struct rspamd_main *) ud;
  884. rspamd_detach_worker(rspamd_main, cur);
  885. }
  886. static void
  887. start_srv_ev(gpointer key, gpointer value, gpointer ud)
  888. {
  889. struct rspamd_worker *cur = (struct rspamd_worker *) value;
  890. struct rspamd_main *rspamd_main = (struct rspamd_main *) ud;
  891. rspamd_attach_worker(rspamd_main, cur);
  892. }
  893. static void
  894. rspamd_final_timer_handler(EV_P_ ev_timer *w, int revents)
  895. {
  896. struct rspamd_main *rspamd_main = (struct rspamd_main *) w->data;
  897. term_attempts--;
  898. g_hash_table_foreach(rspamd_main->workers, hash_worker_wait_callback,
  899. NULL);
  900. if (g_hash_table_size(rspamd_main->workers) == 0) {
  901. ev_break(rspamd_main->event_loop, EVBREAK_ALL);
  902. }
  903. }
  904. /* Signal handlers */
  905. static void
  906. rspamd_term_handler(struct ev_loop *loop, ev_signal *w, int revents)
  907. {
  908. struct rspamd_main *rspamd_main = (struct rspamd_main *) w->data;
  909. static ev_timer ev_finale;
  910. ev_tstamp shutdown_ts;
  911. if (!rspamd_main->wanna_die) {
  912. rspamd_main->wanna_die = TRUE;
  913. shutdown_ts = MAX(SOFT_SHUTDOWN_TIME,
  914. rspamd_main->cfg->task_timeout * 2.0);
  915. msg_info_main("catch termination signal, waiting for %d children for %.2f seconds",
  916. (int) g_hash_table_size(rspamd_main->workers),
  917. valgrind_mode ? shutdown_ts * 10 : shutdown_ts);
  918. /* Stop srv events to avoid false notifications */
  919. g_hash_table_foreach(rspamd_main->workers, stop_srv_ev, rspamd_main);
  920. rspamd_pass_signal(rspamd_main->workers, SIGTERM);
  921. if (control_fd != -1) {
  922. ev_io_stop(rspamd_main->event_loop, &control_ev);
  923. close(control_fd);
  924. }
  925. if (valgrind_mode) {
  926. /* Special case if we are likely running with valgrind */
  927. term_attempts = shutdown_ts / TERMINATION_INTERVAL * 10;
  928. }
  929. else {
  930. term_attempts = shutdown_ts / TERMINATION_INTERVAL;
  931. }
  932. ev_finale.data = rspamd_main;
  933. ev_timer_init(&ev_finale, rspamd_final_timer_handler,
  934. TERMINATION_INTERVAL, TERMINATION_INTERVAL);
  935. ev_timer_start(rspamd_main->event_loop, &ev_finale);
  936. }
  937. }
  938. static void
  939. rspamd_usr1_handler(struct ev_loop *loop, ev_signal *w, int revents)
  940. {
  941. struct rspamd_main *rspamd_main = (struct rspamd_main *) w->data;
  942. if (!rspamd_main->wanna_die) {
  943. rspamd_log_reopen(rspamd_main->logger,
  944. rspamd_main->cfg,
  945. rspamd_main->workers_uid,
  946. rspamd_main->workers_gid);
  947. msg_info_main("logging reinitialised");
  948. g_hash_table_foreach(rspamd_main->workers, reopen_log_handler,
  949. NULL);
  950. }
  951. }
  952. static void
  953. rspamd_stat_update_handler(struct ev_loop *loop, ev_timer *w, int revents)
  954. {
  955. struct rspamd_main *rspamd_main = (struct rspamd_main *) w->data;
  956. struct rspamd_stat cur_stat;
  957. char proctitle[128];
  958. memcpy(&cur_stat, rspamd_main->stat, sizeof(cur_stat));
  959. if (old_stat.messages_scanned > 0 &&
  960. cur_stat.messages_scanned > old_stat.messages_scanned) {
  961. double rate = (double) (cur_stat.messages_scanned - old_stat.messages_scanned) /
  962. w->repeat;
  963. double old_spam = old_stat.actions_stat[METRIC_ACTION_REJECT] +
  964. old_stat.actions_stat[METRIC_ACTION_ADD_HEADER] +
  965. old_stat.actions_stat[METRIC_ACTION_REWRITE_SUBJECT];
  966. double old_ham = old_stat.actions_stat[METRIC_ACTION_NOACTION];
  967. double new_spam = cur_stat.actions_stat[METRIC_ACTION_REJECT] +
  968. cur_stat.actions_stat[METRIC_ACTION_ADD_HEADER] +
  969. cur_stat.actions_stat[METRIC_ACTION_REWRITE_SUBJECT];
  970. double new_ham = cur_stat.actions_stat[METRIC_ACTION_NOACTION];
  971. gsize cnt = MAX_AVG_TIME_SLOTS;
  972. float sum = rspamd_sum_floats(cur_stat.avg_time.avg_time, &cnt);
  973. rspamd_snprintf(proctitle, sizeof(proctitle),
  974. "main process; %.1f msg/sec, %.1f msg/sec spam, %.1f msg/sec ham; %.2fs avg processing time",
  975. rate,
  976. (new_spam - old_spam) / w->repeat,
  977. (new_ham - old_ham) / w->repeat,
  978. cnt > 0 ? sum / cnt : 0);
  979. rspamd_setproctitle("%s", proctitle);
  980. }
  981. memcpy(&old_stat, &cur_stat, sizeof(cur_stat));
  982. }
  983. static void
  984. rspamd_hup_handler(struct ev_loop *loop, ev_signal *w, int revents)
  985. {
  986. struct rspamd_main *rspamd_main = (struct rspamd_main *) w->data;
  987. if (!rspamd_main->wanna_die) {
  988. msg_info_main("rspamd " RVERSION
  989. " is requested to reload configuration");
  990. /* Detach existing workers and stop their heartbeats */
  991. g_hash_table_foreach(rspamd_main->workers, stop_srv_ev, rspamd_main);
  992. if (reread_config(rspamd_main)) {
  993. rspamd_check_core_limits(rspamd_main);
  994. /* Mark old workers */
  995. g_hash_table_foreach(rspamd_main->workers, mark_old_workers, NULL);
  996. msg_info_main("spawn workers with a new config");
  997. spawn_workers(rspamd_main, rspamd_main->event_loop);
  998. msg_info_main("workers spawning has been finished");
  999. /* Kill marked */
  1000. msg_info_main("kill old workers");
  1001. g_hash_table_foreach(rspamd_main->workers, kill_old_workers, NULL);
  1002. }
  1003. else {
  1004. /* Reattach old workers */
  1005. msg_info_main("restore old workers with a old config");
  1006. g_hash_table_foreach(rspamd_main->workers, start_srv_ev, rspamd_main);
  1007. }
  1008. }
  1009. }
  1010. /* Called when a dead child has been found */
  1011. static void
  1012. rspamd_cld_handler(EV_P_ ev_child *w, struct rspamd_main *rspamd_main,
  1013. struct rspamd_worker *wrk)
  1014. {
  1015. gboolean need_refork;
  1016. static struct rspamd_control_command cmd;
  1017. /* Turn off locking for logger */
  1018. ev_child_stop(EV_A_ w);
  1019. /* Remove dead child form children list */
  1020. g_hash_table_remove(rspamd_main->workers, GSIZE_TO_POINTER(wrk->pid));
  1021. g_hash_table_remove_all(wrk->control_events_pending);
  1022. if (wrk->srv_pipe[0] != -1) {
  1023. /* Ugly workaround */
  1024. if (wrk->tmp_data) {
  1025. g_free(wrk->tmp_data);
  1026. }
  1027. rspamd_detach_worker(rspamd_main, wrk);
  1028. }
  1029. if (wrk->control_pipe[0] != -1) {
  1030. /* We also need to clean descriptors left */
  1031. close(wrk->control_pipe[0]);
  1032. close(wrk->srv_pipe[0]);
  1033. }
  1034. if (!rspamd_main->wanna_die) {
  1035. cmd.type = RSPAMD_CONTROL_CHILD_CHANGE;
  1036. cmd.cmd.child_change.what = rspamd_child_terminated;
  1037. cmd.cmd.child_change.pid = wrk->pid;
  1038. cmd.cmd.child_change.additional = w->rstatus;
  1039. rspamd_control_broadcast_srv_cmd(rspamd_main, &cmd, wrk->pid);
  1040. }
  1041. need_refork = rspamd_check_termination_clause(wrk->srv, wrk, w->rstatus);
  1042. if (need_refork) {
  1043. /* Fork another worker in replace of dead one */
  1044. msg_info_main("respawn process %s in lieu of terminated process with pid %P",
  1045. g_quark_to_string(wrk->type),
  1046. wrk->pid);
  1047. rspamd_check_core_limits(rspamd_main);
  1048. rspamd_fork_delayed(wrk->cf, wrk->index, rspamd_main);
  1049. }
  1050. else {
  1051. msg_info_main("do not respawn process %s after found terminated process with pid %P",
  1052. g_quark_to_string(wrk->type),
  1053. wrk->pid);
  1054. }
  1055. REF_RELEASE(wrk->cf);
  1056. g_hash_table_unref(wrk->control_events_pending);
  1057. g_free(wrk);
  1058. }
  1059. /* Control socket handler */
  1060. static void
  1061. rspamd_control_handler(EV_P_ ev_io *w, int revents)
  1062. {
  1063. struct rspamd_main *rspamd_main = (struct rspamd_main *) w->data;
  1064. rspamd_inet_addr_t *addr = NULL;
  1065. int nfd;
  1066. if ((nfd =
  1067. rspamd_accept_from_socket(w->fd, &addr, NULL, NULL)) == -1) {
  1068. msg_warn_main("accept failed: %s", strerror(errno));
  1069. return;
  1070. }
  1071. /* Check for EAGAIN */
  1072. if (nfd == 0) {
  1073. rspamd_inet_address_free(addr);
  1074. return;
  1075. }
  1076. msg_info_main("accepted control connection from %s",
  1077. rspamd_inet_address_to_string(addr));
  1078. rspamd_control_process_client_socket(rspamd_main, nfd, addr);
  1079. }
  1080. static unsigned int
  1081. rspamd_spair_hash(gconstpointer p)
  1082. {
  1083. return rspamd_cryptobox_fast_hash(p, PAIR_ID_LEN, rspamd_hash_seed());
  1084. }
  1085. static gboolean
  1086. rspamd_spair_equal(gconstpointer a, gconstpointer b)
  1087. {
  1088. return memcmp(a, b, PAIR_ID_LEN) == 0;
  1089. }
  1090. static void
  1091. rspamd_spair_close(gpointer p)
  1092. {
  1093. int *fds = p;
  1094. close(fds[0]);
  1095. close(fds[1]);
  1096. g_free(p);
  1097. }
  1098. const char *
  1099. get_cpu_architecture(void)
  1100. {
  1101. #if defined(__x86_64__) || defined(_M_X64)
  1102. return "x86_64";
  1103. #elif defined(__i386) || defined(_M_IX86)
  1104. return "x86";
  1105. #elif defined(__aarch64__)
  1106. return "ARM64";
  1107. #elif defined(__arm__) || defined(_M_ARM)
  1108. return "ARM";
  1109. #elif defined(__loongarch__) || defined(__loongarch64)
  1110. return "LOONGARCH64";
  1111. #elif defined(__mips__)
  1112. return "MIPS";
  1113. #elif defined(__powerpc__) || defined(_M_PPC)
  1114. return "PowerPC";
  1115. #elif defined(__sparc__)
  1116. return "SPARC";
  1117. #else
  1118. return "Unknown";
  1119. #endif
  1120. }
  1121. static void
  1122. version(struct rspamd_main *rspamd_main)
  1123. {
  1124. #if defined(GIT_VERSION) && GIT_VERSION == 1
  1125. rspamd_printf("Rspamd daemon version " RVERSION "-git." RID "\n\n");
  1126. #else
  1127. rspamd_printf("Rspamd daemon version " RVERSION "\n\n");
  1128. #endif
  1129. rspamd_printf("CPU architecture %s; features: %s\n",
  1130. get_cpu_architecture(),
  1131. rspamd_main->cfg->libs_ctx->crypto_ctx->cpu_extensions);
  1132. #ifdef WITH_HYPERSCAN
  1133. rspamd_printf("Hyperscan enabled: TRUE\n");
  1134. #else
  1135. rspamd_printf("Hyperscan enabled: FALSE\n");
  1136. #endif
  1137. #ifdef WITH_JEMALLOC
  1138. rspamd_printf("Jemalloc enabled: TRUE\n");
  1139. #else
  1140. rspamd_printf("Jemalloc enabled: FALSE\n");
  1141. #endif
  1142. #ifdef WITH_LUAJIT
  1143. rspamd_printf("LuaJIT enabled: TRUE (LuaJIT version: %s)\n", LUAJIT_VERSION);
  1144. #else
  1145. rspamd_printf("LuaJIT enabled: FALSE (Lua version: %s)\n", LUA_VERSION);
  1146. #endif
  1147. #ifndef __has_feature
  1148. #define __has_feature(x) 0
  1149. #endif
  1150. #if (defined(__has_feature) && __has_feature(address_sanitizer)) || defined(ADDRESS_SANITIZER)
  1151. rspamd_printf("ASAN enabled: TRUE\n");
  1152. #else
  1153. rspamd_printf("ASAN enabled: FALSE\n");
  1154. #endif
  1155. #ifdef HAVE_CBLAS
  1156. rspamd_printf("BLAS enabled: TRUE\n");
  1157. #else
  1158. rspamd_printf("BLAS enabled: FALSE\n");
  1159. #endif
  1160. #ifdef WITH_FASTTEXT
  1161. rspamd_printf("Fasttext enabled: TRUE\n");
  1162. #else
  1163. rspamd_printf("Fasttext enabled: FALSE\n");
  1164. #endif
  1165. }
  1166. static gboolean
  1167. rspamd_main_daemon(struct rspamd_main *rspamd_main)
  1168. {
  1169. int fd;
  1170. pid_t old_pid = getpid();
  1171. switch (fork()) {
  1172. case -1:
  1173. msg_err_main("fork() failed: %s", strerror(errno));
  1174. return FALSE;
  1175. case 0:
  1176. break;
  1177. default:
  1178. /* Old process */
  1179. exit(0);
  1180. }
  1181. rspamd_log_on_fork(g_quark_from_static_string("main"),
  1182. rspamd_main->cfg,
  1183. rspamd_main->logger);
  1184. if (setsid() == -1) {
  1185. msg_err_main("setsid () failed: %s", strerror(errno));
  1186. return FALSE;
  1187. }
  1188. umask(0);
  1189. fd = open("/dev/null", O_RDWR);
  1190. if (fd == -1) {
  1191. msg_err_main("open(\"/dev/null\") failed: %s", strerror(errno));
  1192. return FALSE;
  1193. }
  1194. if (dup2(fd, STDIN_FILENO) == -1) {
  1195. msg_err_main("dup2(STDIN) failed: %s", strerror(errno));
  1196. return FALSE;
  1197. }
  1198. if (dup2(fd, STDOUT_FILENO) == -1) {
  1199. msg_err_main("dup2(STDOUT) failed: %s", strerror(errno));
  1200. return FALSE;
  1201. }
  1202. if (fd > STDERR_FILENO) {
  1203. if (close(fd) == -1) {
  1204. msg_err_main("close() failed: %s", strerror(errno));
  1205. return FALSE;
  1206. }
  1207. }
  1208. msg_info_main("daemonized successfully; old pid %P, new pid %P; pid file: %s",
  1209. old_pid, getpid(),
  1210. rspamd_main->cfg->pid_file);
  1211. return TRUE;
  1212. }
  1213. int main(int argc, char **argv, char **env)
  1214. {
  1215. int i, res = 0;
  1216. struct sigaction signals, sigpipe_act;
  1217. worker_t **pworker;
  1218. GQuark type;
  1219. rspamd_inet_addr_t *control_addr = NULL;
  1220. struct ev_loop *event_loop;
  1221. struct rspamd_main *rspamd_main;
  1222. gboolean skip_pid = FALSE;
  1223. sigset_t control_signals;
  1224. /* Block special signals on loading */
  1225. sigemptyset(&control_signals);
  1226. sigaddset(&control_signals, SIGHUP);
  1227. sigaddset(&control_signals, SIGUSR1);
  1228. sigaddset(&control_signals, SIGUSR2);
  1229. sigprocmask(SIG_BLOCK, &control_signals, NULL);
  1230. rspamd_main = (struct rspamd_main *) g_malloc0(sizeof(struct rspamd_main));
  1231. rspamd_main->server_pool = rspamd_mempool_new(rspamd_mempool_suggest_size(),
  1232. "main", 0);
  1233. rspamd_main->stat = rspamd_mempool_alloc0_shared_(rspamd_main->server_pool,
  1234. sizeof(struct rspamd_stat),
  1235. RSPAMD_ALIGNOF(struct rspamd_stat),
  1236. G_STRLOC);
  1237. /* Set all time slots to nan */
  1238. for (i = 0; i < MAX_AVG_TIME_SLOTS; i++) {
  1239. rspamd_main->stat->avg_time.avg_time[i] = NAN;
  1240. }
  1241. rspamd_main->cfg = rspamd_config_new(RSPAMD_CONFIG_INIT_DEFAULT);
  1242. rspamd_main->spairs = g_hash_table_new_full(rspamd_spair_hash,
  1243. rspamd_spair_equal, g_free, rspamd_spair_close);
  1244. rspamd_main->start_mtx = rspamd_mempool_get_mutex(rspamd_main->server_pool);
  1245. if (getenv("VALGRIND") != NULL) {
  1246. valgrind_mode = TRUE;
  1247. }
  1248. #ifndef HAVE_SETPROCTITLE
  1249. rspamd_init_title(rspamd_main->server_pool, argc, argv, env);
  1250. #endif
  1251. rspamd_main->cfg->libs_ctx = rspamd_init_libs();
  1252. memset(&signals, 0, sizeof(struct sigaction));
  1253. read_cmd_line(&argc, &argv, rspamd_main->cfg);
  1254. if (show_version) {
  1255. version(rspamd_main);
  1256. exit(EXIT_SUCCESS);
  1257. }
  1258. if (argc > 0) {
  1259. /* Parse variables */
  1260. for (i = 0; i < argc; i++) {
  1261. if (strchr(argv[i], '=') != NULL) {
  1262. char *k, *v, *t;
  1263. k = g_strdup(argv[i]);
  1264. t = strchr(k, '=');
  1265. v = g_strdup(t + 1);
  1266. *t = '\0';
  1267. if (ucl_vars == NULL) {
  1268. ucl_vars = g_hash_table_new_full(rspamd_strcase_hash,
  1269. rspamd_strcase_equal, g_free, g_free);
  1270. }
  1271. g_hash_table_insert(ucl_vars, k, v);
  1272. }
  1273. }
  1274. }
  1275. if (is_debug) {
  1276. rspamd_main->cfg->log_level = G_LOG_LEVEL_DEBUG;
  1277. }
  1278. else {
  1279. rspamd_main->cfg->log_level = G_LOG_LEVEL_MESSAGE;
  1280. }
  1281. type = g_quark_from_static_string("main");
  1282. /* First set logger to console logger */
  1283. rspamd_main->logger = rspamd_log_open_emergency(rspamd_main->server_pool, 0);
  1284. g_assert(rspamd_main->logger != NULL);
  1285. if (is_debug) {
  1286. rspamd_log_set_log_level(rspamd_main->logger, G_LOG_LEVEL_DEBUG);
  1287. }
  1288. else {
  1289. rspamd_log_set_log_level(rspamd_main->logger, G_LOG_LEVEL_MESSAGE);
  1290. }
  1291. g_log_set_default_handler(rspamd_glib_log_function, rspamd_main->logger);
  1292. g_set_printerr_handler(rspamd_glib_printerr_function);
  1293. detect_priv(rspamd_main);
  1294. msg_notice_main("rspamd " RVERSION
  1295. " is loading configuration, build id: " RID);
  1296. pworker = &workers[0];
  1297. while (*pworker) {
  1298. /* Init string quarks */
  1299. (void) g_quark_from_static_string((*pworker)->name);
  1300. pworker++;
  1301. }
  1302. /* Init listen sockets hash */
  1303. listen_sockets = g_hash_table_new(g_direct_hash, g_direct_equal);
  1304. sqlite3_initialize();
  1305. /* Load config */
  1306. if (!load_rspamd_config(rspamd_main, rspamd_main->cfg, TRUE,
  1307. RSPAMD_CONFIG_LOAD_ALL, FALSE)) {
  1308. exit(EXIT_FAILURE);
  1309. }
  1310. /* Override pidfile from configuration by command line argument */
  1311. if (rspamd_pidfile != NULL) {
  1312. rspamd_main->cfg->pid_file = rspamd_pidfile;
  1313. }
  1314. /* Force debug log */
  1315. if (is_debug) {
  1316. rspamd_log_set_log_level(rspamd_main->logger, G_LOG_LEVEL_DEBUG);
  1317. }
  1318. /* Create rolling history */
  1319. rspamd_main->history = rspamd_roll_history_new(rspamd_main->server_pool,
  1320. rspamd_main->cfg->history_rows,
  1321. rspamd_main->cfg);
  1322. msg_info_main("rspamd " RVERSION
  1323. " is starting, build id: " RID);
  1324. rspamd_main->cfg->cfg_name = rspamd_mempool_strdup(
  1325. rspamd_main->cfg->cfg_pool,
  1326. rspamd_main->cfg->cfg_name);
  1327. msg_info_main("cpu features: %s",
  1328. rspamd_main->cfg->libs_ctx->crypto_ctx->cpu_extensions);
  1329. msg_info_main("cryptobox configuration: curve25519(libsodium), "
  1330. "chacha20(%s), poly1305(libsodium), siphash(libsodium), blake2(libsodium), base64(%s)",
  1331. rspamd_main->cfg->libs_ctx->crypto_ctx->chacha20_impl,
  1332. rspamd_main->cfg->libs_ctx->crypto_ctx->base64_impl);
  1333. msg_info_main("libottery prf: %s", ottery_get_impl_name());
  1334. /* Daemonize */
  1335. if (!no_fork) {
  1336. if (!rspamd_main_daemon(rspamd_main)) {
  1337. exit(EXIT_FAILURE);
  1338. }
  1339. /* Close emergency logger */
  1340. rspamd_log_close(rspamd_log_emergency_logger());
  1341. }
  1342. /* Write info */
  1343. rspamd_main->pid = getpid();
  1344. rspamd_main->type = type;
  1345. rspamd_set_crash_handler(rspamd_main);
  1346. /* Ignore SIGPIPE as we handle write errors manually */
  1347. sigemptyset(&sigpipe_act.sa_mask);
  1348. sigaddset(&sigpipe_act.sa_mask, SIGPIPE);
  1349. sigpipe_act.sa_handler = SIG_IGN;
  1350. sigpipe_act.sa_flags = 0;
  1351. sigaction(SIGPIPE, &sigpipe_act, NULL);
  1352. if (rspamd_main->cfg->pid_file == NULL) {
  1353. msg_info_main("pid file is not specified, skipping writing it");
  1354. skip_pid = TRUE;
  1355. }
  1356. else if (no_fork) {
  1357. msg_info_main("skip writing pid in no-fork mode");
  1358. skip_pid = TRUE;
  1359. }
  1360. else if (rspamd_write_pid(rspamd_main) == -1) {
  1361. msg_err_main("cannot write pid file %s", rspamd_main->cfg->pid_file);
  1362. exit(-errno);
  1363. }
  1364. sigprocmask(SIG_BLOCK, &signals.sa_mask, NULL);
  1365. /* Set title */
  1366. rspamd_setproctitle("main process");
  1367. /* Open control socket if needed */
  1368. control_fd = -1;
  1369. if (rspamd_main->cfg->control_socket_path) {
  1370. if (!rspamd_parse_inet_address(&control_addr,
  1371. rspamd_main->cfg->control_socket_path,
  1372. strlen(rspamd_main->cfg->control_socket_path),
  1373. RSPAMD_INET_ADDRESS_PARSE_DEFAULT)) {
  1374. msg_err_main("cannot parse inet address %s",
  1375. rspamd_main->cfg->control_socket_path);
  1376. }
  1377. else {
  1378. control_fd = rspamd_inet_address_listen(control_addr, SOCK_STREAM,
  1379. RSPAMD_INET_ADDRESS_LISTEN_ASYNC, -1);
  1380. if (control_fd == -1) {
  1381. msg_err_main("cannot open control socket at path: %s",
  1382. rspamd_main->cfg->control_socket_path);
  1383. }
  1384. }
  1385. }
  1386. /* Maybe read roll history */
  1387. if (rspamd_main->history && rspamd_main->cfg->history_file) {
  1388. rspamd_roll_history_load(rspamd_main->history,
  1389. rspamd_main->cfg->history_file);
  1390. }
  1391. /* Init workers hash */
  1392. rspamd_main->workers = g_hash_table_new(g_direct_hash, g_direct_equal);
  1393. /* Unblock control signals */
  1394. sigprocmask(SIG_UNBLOCK, &control_signals, NULL);
  1395. /* Init event base */
  1396. event_loop = ev_default_loop(rspamd_config_ev_backend_get(rspamd_main->cfg));
  1397. rspamd_main->event_loop = event_loop;
  1398. if (event_loop) {
  1399. int loop_type = ev_backend(event_loop);
  1400. gboolean effective_backend;
  1401. const char *loop_str;
  1402. loop_str =
  1403. rspamd_config_ev_backend_to_string(loop_type, &effective_backend);
  1404. if (!effective_backend) {
  1405. msg_warn_main("event loop uses non-optimal backend: %s", loop_str);
  1406. }
  1407. else {
  1408. msg_info_main("event loop initialised with backend: %s", loop_str);
  1409. }
  1410. }
  1411. else {
  1412. msg_err("cannot init event loop! exiting");
  1413. exit(EXIT_FAILURE);
  1414. }
  1415. /* Unblock signals */
  1416. sigemptyset(&signals.sa_mask);
  1417. sigprocmask(SIG_SETMASK, &signals.sa_mask, NULL);
  1418. /* Set events for signals */
  1419. ev_signal_init(&rspamd_main->term_ev, rspamd_term_handler, SIGTERM);
  1420. rspamd_main->term_ev.data = rspamd_main;
  1421. ev_signal_start(event_loop, &rspamd_main->term_ev);
  1422. ev_signal_init(&rspamd_main->int_ev, rspamd_term_handler, SIGINT);
  1423. rspamd_main->int_ev.data = rspamd_main;
  1424. ev_signal_start(event_loop, &rspamd_main->int_ev);
  1425. ev_signal_init(&rspamd_main->hup_ev, rspamd_hup_handler, SIGHUP);
  1426. rspamd_main->hup_ev.data = rspamd_main;
  1427. ev_signal_start(event_loop, &rspamd_main->hup_ev);
  1428. ev_signal_init(&rspamd_main->usr1_ev, rspamd_usr1_handler, SIGUSR1);
  1429. rspamd_main->usr1_ev.data = rspamd_main;
  1430. ev_signal_start(event_loop, &rspamd_main->usr1_ev);
  1431. /* Update proctitle according to number of messages processed */
  1432. static const ev_tstamp stat_update_time = 10.0;
  1433. memset(&old_stat, 0, sizeof(old_stat));
  1434. stat_ev.data = rspamd_main;
  1435. ev_timer_init(&stat_ev, rspamd_stat_update_handler,
  1436. stat_update_time, stat_update_time);
  1437. ev_timer_start(event_loop, &stat_ev);
  1438. rspamd_check_core_limits(rspamd_main);
  1439. rspamd_mempool_lock_mutex(rspamd_main->start_mtx);
  1440. spawn_workers(rspamd_main, event_loop);
  1441. rspamd_mempool_unlock_mutex(rspamd_main->start_mtx);
  1442. rspamd_main->http_ctx = rspamd_http_context_create(rspamd_main->cfg,
  1443. event_loop, rspamd_main->cfg->ups_ctx);
  1444. if (control_fd != -1) {
  1445. msg_info_main("listening for control commands on %s",
  1446. rspamd_inet_address_to_string(control_addr));
  1447. ev_io_init(&control_ev, rspamd_control_handler, control_fd, EV_READ);
  1448. control_ev.data = rspamd_main;
  1449. ev_io_start(event_loop, &control_ev);
  1450. }
  1451. ev_loop(event_loop, 0);
  1452. /* Maybe save roll history */
  1453. if (rspamd_main->history && rspamd_main->cfg->history_file) {
  1454. rspamd_roll_history_save(rspamd_main->history,
  1455. rspamd_main->cfg->history_file);
  1456. }
  1457. if (rspamd_main->cfg->cache) {
  1458. rspamd_symcache_save(rspamd_main->cfg->cache);
  1459. }
  1460. msg_info_main("terminating...");
  1461. #ifdef WITH_HYPERSCAN
  1462. rspamd_hyperscan_cleanup_maybe();
  1463. #endif
  1464. REF_RELEASE(rspamd_main->cfg);
  1465. rspamd_log_close(rspamd_main->logger);
  1466. g_hash_table_unref(rspamd_main->spairs);
  1467. g_hash_table_unref(rspamd_main->workers);
  1468. rspamd_mempool_delete(rspamd_main->server_pool);
  1469. if (!skip_pid) {
  1470. rspamd_pidfile_close(rspamd_main->pfh);
  1471. }
  1472. rspamd_unset_crash_handler(rspamd_main);
  1473. g_free(rspamd_main);
  1474. ev_unref(event_loop);
  1475. sqlite3_shutdown();
  1476. if (control_addr) {
  1477. rspamd_inet_address_free(control_addr);
  1478. }
  1479. return (res);
  1480. }