You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

rspamd_control.c 39KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435
  1. /*
  2. * Copyright 2024 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "rspamd.h"
  18. #include "rspamd_control.h"
  19. #include "worker_util.h"
  20. #include "libserver/http/http_connection.h"
  21. #include "libserver/http/http_private.h"
  22. #include "libutil/libev_helper.h"
  23. #include "unix-std.h"
  24. #include "utlist.h"
  25. #ifdef HAVE_SYS_RESOURCE_H
  26. #include <sys/resource.h>
  27. #endif
  28. #ifdef WITH_HYPERSCAN
  29. #include "hyperscan_tools.h"
  30. #endif
  31. #define msg_debug_control(...) rspamd_conditional_debug_fast(NULL, NULL, \
  32. rspamd_control_log_id, "control", NULL, \
  33. RSPAMD_LOG_FUNC, \
  34. __VA_ARGS__)
  35. static ev_tstamp io_timeout = 30.0;
  36. static ev_tstamp worker_io_timeout = 0.5;
  37. struct rspamd_control_session;
  38. struct rspamd_control_reply_elt {
  39. struct rspamd_control_reply reply;
  40. struct rspamd_io_ev ev;
  41. struct ev_loop *event_loop;
  42. struct rspamd_worker *worker;
  43. GQuark wrk_type;
  44. pid_t wrk_pid;
  45. rspamd_ev_cb handler;
  46. gpointer ud;
  47. int attached_fd;
  48. bool sent;
  49. struct rspamd_control_command cmd;
  50. GHashTable *pending_elts;
  51. struct rspamd_control_reply_elt *prev, *next;
  52. };
  53. struct rspamd_control_session {
  54. int fd;
  55. struct ev_loop *event_loop;
  56. struct rspamd_main *rspamd_main;
  57. struct rspamd_http_connection *conn;
  58. struct rspamd_control_command cmd;
  59. struct rspamd_control_reply_elt *replies;
  60. rspamd_inet_addr_t *addr;
  61. unsigned int replies_remain;
  62. gboolean is_reply;
  63. };
  64. static const struct rspamd_control_cmd_match {
  65. rspamd_ftok_t name;
  66. enum rspamd_control_type type;
  67. } cmd_matches[] = {
  68. {.name = {
  69. .begin = "/stat",
  70. .len = sizeof("/stat") - 1},
  71. .type = RSPAMD_CONTROL_STAT},
  72. {.name = {.begin = "/reload", .len = sizeof("/reload") - 1}, .type = RSPAMD_CONTROL_RELOAD},
  73. {.name = {.begin = "/reresolve", .len = sizeof("/reresolve") - 1}, .type = RSPAMD_CONTROL_RERESOLVE},
  74. {.name = {.begin = "/recompile", .len = sizeof("/recompile") - 1}, .type = RSPAMD_CONTROL_RECOMPILE},
  75. {.name = {.begin = "/fuzzystat", .len = sizeof("/fuzzystat") - 1}, .type = RSPAMD_CONTROL_FUZZY_STAT},
  76. {.name = {.begin = "/fuzzysync", .len = sizeof("/fuzzysync") - 1}, .type = RSPAMD_CONTROL_FUZZY_SYNC},
  77. };
  78. static void rspamd_control_ignore_io_handler(int fd, short what, void *ud);
  79. static void rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt);
  80. INIT_LOG_MODULE(control)
  81. void rspamd_control_send_error(struct rspamd_control_session *session,
  82. int code, const char *error_msg, ...)
  83. {
  84. struct rspamd_http_message *msg;
  85. rspamd_fstring_t *reply;
  86. va_list args;
  87. msg = rspamd_http_new_message(HTTP_RESPONSE);
  88. va_start(args, error_msg);
  89. msg->status = rspamd_fstring_new();
  90. rspamd_vprintf_fstring(&msg->status, error_msg, args);
  91. va_end(args);
  92. msg->date = time(NULL);
  93. msg->code = code;
  94. reply = rspamd_fstring_sized_new(msg->status->len + 16);
  95. rspamd_printf_fstring(&reply, "{\"error\":\"%V\"}", msg->status);
  96. rspamd_http_message_set_body_from_fstring_steal(msg, reply);
  97. rspamd_http_connection_reset(session->conn);
  98. rspamd_http_connection_write_message(session->conn,
  99. msg,
  100. NULL,
  101. "application/json",
  102. session,
  103. io_timeout);
  104. }
  105. static void
  106. rspamd_control_send_ucl(struct rspamd_control_session *session,
  107. ucl_object_t *obj)
  108. {
  109. struct rspamd_http_message *msg;
  110. rspamd_fstring_t *reply;
  111. msg = rspamd_http_new_message(HTTP_RESPONSE);
  112. msg->date = time(NULL);
  113. msg->code = 200;
  114. msg->status = rspamd_fstring_new_init("OK", 2);
  115. reply = rspamd_fstring_sized_new(BUFSIZ);
  116. rspamd_ucl_emit_fstring(obj, UCL_EMIT_JSON_COMPACT, &reply);
  117. rspamd_http_message_set_body_from_fstring_steal(msg, reply);
  118. rspamd_http_connection_reset(session->conn);
  119. rspamd_http_connection_write_message(session->conn,
  120. msg,
  121. NULL,
  122. "application/json",
  123. session,
  124. io_timeout);
  125. }
  126. static void
  127. rspamd_control_connection_close(struct rspamd_control_session *session)
  128. {
  129. struct rspamd_control_reply_elt *elt, *telt;
  130. struct rspamd_main *rspamd_main;
  131. rspamd_main = session->rspamd_main;
  132. msg_info_main("finished connection from %s",
  133. rspamd_inet_address_to_string(session->addr));
  134. DL_FOREACH_SAFE(session->replies, elt, telt)
  135. {
  136. rspamd_control_stop_pending(elt);
  137. }
  138. rspamd_inet_address_free(session->addr);
  139. rspamd_http_connection_unref(session->conn);
  140. close(session->fd);
  141. g_free(session);
  142. }
  143. static void
  144. rspamd_control_write_reply(struct rspamd_control_session *session)
  145. {
  146. ucl_object_t *rep, *cur, *workers;
  147. struct rspamd_control_reply_elt *elt;
  148. char tmpbuf[64];
  149. double total_utime = 0, total_systime = 0;
  150. struct ucl_parser *parser;
  151. unsigned int total_conns = 0;
  152. rep = ucl_object_typed_new(UCL_OBJECT);
  153. workers = ucl_object_typed_new(UCL_OBJECT);
  154. DL_FOREACH(session->replies, elt)
  155. {
  156. /* Skip incompatible worker for fuzzy_stat */
  157. if ((session->cmd.type == RSPAMD_CONTROL_FUZZY_STAT ||
  158. session->cmd.type == RSPAMD_CONTROL_FUZZY_SYNC) &&
  159. elt->wrk_type != g_quark_from_static_string("fuzzy")) {
  160. continue;
  161. }
  162. rspamd_snprintf(tmpbuf, sizeof(tmpbuf), "%P", elt->wrk_pid);
  163. cur = ucl_object_typed_new(UCL_OBJECT);
  164. ucl_object_insert_key(cur, ucl_object_fromstring(g_quark_to_string(elt->wrk_type)), "type", 0, false);
  165. switch (session->cmd.type) {
  166. case RSPAMD_CONTROL_STAT:
  167. ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.stat.conns), "conns", 0, false);
  168. ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.stat.utime), "utime", 0, false);
  169. ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.stat.systime), "systime", 0, false);
  170. ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.stat.uptime), "uptime", 0, false);
  171. ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.stat.maxrss), "maxrss", 0, false);
  172. total_utime += elt->reply.reply.stat.utime;
  173. total_systime += elt->reply.reply.stat.systime;
  174. total_conns += elt->reply.reply.stat.conns;
  175. break;
  176. case RSPAMD_CONTROL_RELOAD:
  177. ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.reload.status), "status", 0, false);
  178. break;
  179. case RSPAMD_CONTROL_RECOMPILE:
  180. ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.recompile.status), "status", 0, false);
  181. break;
  182. case RSPAMD_CONTROL_RERESOLVE:
  183. ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.reresolve.status), "status", 0, false);
  184. break;
  185. case RSPAMD_CONTROL_FUZZY_STAT:
  186. if (elt->attached_fd != -1) {
  187. /* We have some data to parse */
  188. parser = ucl_parser_new(0);
  189. ucl_object_insert_key(cur,
  190. ucl_object_fromint(
  191. elt->reply.reply.fuzzy_stat.status),
  192. "status",
  193. 0,
  194. false);
  195. if (ucl_parser_add_fd(parser, elt->attached_fd)) {
  196. ucl_object_insert_key(cur, ucl_parser_get_object(parser),
  197. "data", 0, false);
  198. ucl_parser_free(parser);
  199. }
  200. else {
  201. ucl_object_insert_key(cur, ucl_object_fromstring(ucl_parser_get_error(parser)), "error", 0, false);
  202. ucl_parser_free(parser);
  203. }
  204. ucl_object_insert_key(cur,
  205. ucl_object_fromlstring(
  206. elt->reply.reply.fuzzy_stat.storage_id,
  207. MEMPOOL_UID_LEN - 1),
  208. "id",
  209. 0,
  210. false);
  211. }
  212. else {
  213. ucl_object_insert_key(cur,
  214. ucl_object_fromstring("missing file"),
  215. "error",
  216. 0,
  217. false);
  218. ucl_object_insert_key(cur,
  219. ucl_object_fromint(
  220. elt->reply.reply.fuzzy_stat.status),
  221. "status",
  222. 0,
  223. false);
  224. }
  225. break;
  226. case RSPAMD_CONTROL_FUZZY_SYNC:
  227. ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.fuzzy_sync.status), "status", 0, false);
  228. break;
  229. default:
  230. break;
  231. }
  232. if (elt->attached_fd != -1) {
  233. close(elt->attached_fd);
  234. elt->attached_fd = -1;
  235. }
  236. ucl_object_insert_key(workers, cur, tmpbuf, 0, true);
  237. }
  238. ucl_object_insert_key(rep, workers, "workers", 0, false);
  239. if (session->cmd.type == RSPAMD_CONTROL_STAT) {
  240. /* Total stats */
  241. cur = ucl_object_typed_new(UCL_OBJECT);
  242. ucl_object_insert_key(cur, ucl_object_fromint(total_conns), "conns", 0, false);
  243. ucl_object_insert_key(cur, ucl_object_fromdouble(total_utime), "utime", 0, false);
  244. ucl_object_insert_key(cur, ucl_object_fromdouble(total_systime), "systime", 0, false);
  245. ucl_object_insert_key(rep, cur, "total", 0, false);
  246. }
  247. rspamd_control_send_ucl(session, rep);
  248. ucl_object_unref(rep);
  249. }
  250. static void
  251. rspamd_control_wrk_io(int fd, short what, gpointer ud)
  252. {
  253. struct rspamd_control_reply_elt *elt = ud;
  254. struct rspamd_control_session *session;
  255. unsigned char fdspace[CMSG_SPACE(sizeof(int))];
  256. struct iovec iov;
  257. struct msghdr msg;
  258. gssize r;
  259. session = elt->ud;
  260. elt->attached_fd = -1;
  261. if (what == EV_READ) {
  262. iov.iov_base = &elt->reply;
  263. iov.iov_len = sizeof(elt->reply);
  264. memset(&msg, 0, sizeof(msg));
  265. msg.msg_control = fdspace;
  266. msg.msg_controllen = sizeof(fdspace);
  267. msg.msg_iov = &iov;
  268. msg.msg_iovlen = 1;
  269. r = recvmsg(fd, &msg, 0);
  270. if (r == -1) {
  271. msg_err("cannot read reply from the worker %P (%s): %s",
  272. elt->wrk_pid, g_quark_to_string(elt->wrk_type),
  273. strerror(errno));
  274. }
  275. else if (r >= (gssize) sizeof(elt->reply)) {
  276. if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
  277. elt->attached_fd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
  278. }
  279. }
  280. }
  281. else {
  282. /* Timeout waiting */
  283. msg_warn("timeout waiting reply from %P (%s)",
  284. elt->wrk_pid, g_quark_to_string(elt->wrk_type));
  285. }
  286. session->replies_remain--;
  287. rspamd_ev_watcher_stop(session->event_loop,
  288. &elt->ev);
  289. if (session->replies_remain == 0) {
  290. rspamd_control_write_reply(session);
  291. }
  292. }
  293. static void
  294. rspamd_control_error_handler(struct rspamd_http_connection *conn, GError *err)
  295. {
  296. struct rspamd_control_session *session = conn->ud;
  297. struct rspamd_main *rspamd_main;
  298. rspamd_main = session->rspamd_main;
  299. if (!session->is_reply) {
  300. msg_info_main("abnormally closing control connection: %e", err);
  301. session->is_reply = TRUE;
  302. rspamd_control_send_error(session, err->code, "%s", err->message);
  303. }
  304. else {
  305. rspamd_control_connection_close(session);
  306. }
  307. }
  308. void rspamd_pending_control_free(gpointer p)
  309. {
  310. struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *) p;
  311. if (rep_elt->sent) {
  312. rspamd_ev_watcher_stop(rep_elt->event_loop, &rep_elt->ev);
  313. }
  314. g_hash_table_unref(rep_elt->pending_elts);
  315. g_free(rep_elt);
  316. }
  317. static inline void
  318. rspamd_control_fill_msghdr(struct rspamd_control_command *cmd,
  319. int attached_fd, struct msghdr *msg)
  320. {
  321. struct cmsghdr *cmsg;
  322. struct iovec iov;
  323. unsigned char fdspace[CMSG_SPACE(sizeof(int))];
  324. memset(msg, 0, sizeof(*msg));
  325. /* Attach fd to the message */
  326. if (attached_fd != -1) {
  327. memset(fdspace, 0, sizeof(fdspace));
  328. msg->msg_control = fdspace;
  329. msg->msg_controllen = sizeof(fdspace);
  330. cmsg = CMSG_FIRSTHDR(msg);
  331. cmsg->cmsg_level = SOL_SOCKET;
  332. cmsg->cmsg_type = SCM_RIGHTS;
  333. cmsg->cmsg_len = CMSG_LEN(sizeof(int));
  334. memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int));
  335. }
  336. iov.iov_base = cmd;
  337. iov.iov_len = sizeof(*cmd);
  338. msg->msg_iov = &iov;
  339. msg->msg_iovlen = 1;
  340. }
  341. static void
  342. rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
  343. {
  344. GHashTable *htb;
  345. struct rspamd_main *rspamd_main;
  346. gsize pending;
  347. /* It stops event and frees hash */
  348. htb = elt->pending_elts;
  349. pending = g_hash_table_size(htb);
  350. msg_debug_control("stop pending for %P(%s), %d events pending", elt->wrk_pid,
  351. g_quark_to_string(elt->wrk_type),
  352. (int) pending);
  353. if (elt->worker->state != rspamd_worker_state_terminating && pending != 0) {
  354. /* Invoke another event from the queue */
  355. GHashTableIter it;
  356. gpointer k, v;
  357. g_hash_table_iter_init(&it, elt->pending_elts);
  358. while (g_hash_table_iter_next(&it, &k, &v)) {
  359. struct rspamd_control_reply_elt *cur = v;
  360. if (!cur->sent) {
  361. struct msghdr msg;
  362. rspamd_main = cur->worker->srv;
  363. rspamd_control_fill_msghdr(&cur->cmd, cur->attached_fd, &msg);
  364. ssize_t r = sendmsg(cur->worker->control_pipe[0], &msg, 0);
  365. if (r == sizeof(cur->cmd)) {
  366. msg_debug_control("restarting pending event for %P(%s), %d events pending",
  367. cur->wrk_pid,
  368. g_quark_to_string(cur->wrk_type),
  369. (int) pending - 1);
  370. rspamd_ev_watcher_init(&cur->ev,
  371. cur->worker->control_pipe[0],
  372. EV_READ, cur->handler,
  373. cur);
  374. rspamd_ev_watcher_start(cur->event_loop,
  375. &cur->ev, worker_io_timeout);
  376. cur->sent = true;
  377. break; /* Exit the outer loop as we have invoked something */
  378. }
  379. else {
  380. msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
  381. (int) cur->cmd.type,
  382. cur->wrk_pid,
  383. g_quark_to_string(cur->wrk_type),
  384. cur->worker->control_pipe[0],
  385. strerror(errno));
  386. g_hash_table_remove(elt->pending_elts, cur);
  387. }
  388. }
  389. }
  390. }
  391. /* Remove from hash and performs the cleanup */
  392. g_hash_table_remove(elt->pending_elts, elt);
  393. }
  394. static struct rspamd_control_reply_elt *
  395. rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
  396. struct rspamd_control_command *cmd,
  397. int attached_fd,
  398. rspamd_ev_cb handler,
  399. gpointer ud,
  400. pid_t except_pid)
  401. {
  402. GHashTableIter it;
  403. struct rspamd_worker *wrk;
  404. struct rspamd_control_reply_elt *rep_elt, *res = NULL;
  405. gpointer k, v;
  406. struct msghdr msg;
  407. gssize r;
  408. g_hash_table_iter_init(&it, rspamd_main->workers);
  409. while (g_hash_table_iter_next(&it, &k, &v)) {
  410. wrk = v;
  411. /* No control pipe */
  412. if (wrk->control_pipe[0] == -1) {
  413. continue;
  414. }
  415. if (except_pid != 0 && wrk->pid == except_pid) {
  416. continue;
  417. }
  418. /* Worker is terminating, do not bother sending stuff */
  419. if (wrk->state == rspamd_worker_state_terminating) {
  420. continue;
  421. }
  422. rep_elt = g_malloc0(sizeof(*rep_elt));
  423. rep_elt->worker = wrk;
  424. rep_elt->wrk_pid = wrk->pid;
  425. rep_elt->wrk_type = wrk->type;
  426. rep_elt->event_loop = rspamd_main->event_loop;
  427. rep_elt->ud = ud;
  428. rep_elt->handler = handler;
  429. memcpy(&rep_elt->cmd, cmd, sizeof(*cmd));
  430. rep_elt->sent = false;
  431. if (g_hash_table_size(wrk->control_events_pending) == 0) {
  432. /* We can send command */
  433. rspamd_control_fill_msghdr(cmd, attached_fd, &msg);
  434. r = sendmsg(wrk->control_pipe[0], &msg, 0);
  435. if (r == sizeof(*cmd)) {
  436. rspamd_ev_watcher_init(&rep_elt->ev,
  437. wrk->control_pipe[0],
  438. EV_READ, handler,
  439. rep_elt);
  440. rspamd_ev_watcher_start(rspamd_main->event_loop,
  441. &rep_elt->ev, worker_io_timeout);
  442. rep_elt->sent = true;
  443. rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
  444. g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
  445. DL_APPEND(res, rep_elt);
  446. msg_debug_control("sent command %d to the worker %P(%s), fd: %d",
  447. (int) cmd->type,
  448. wrk->pid,
  449. g_quark_to_string(wrk->type),
  450. wrk->control_pipe[0]);
  451. }
  452. else {
  453. msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
  454. (int) cmd->type,
  455. wrk->pid,
  456. g_quark_to_string(wrk->type),
  457. wrk->control_pipe[0],
  458. strerror(errno));
  459. g_free(rep_elt);
  460. }
  461. }
  462. else {
  463. /* We need to wait till the last command is processed, or it will mess up all serialization */
  464. msg_debug_control("pending event for %P(%s), %d events pending",
  465. wrk->pid,
  466. g_quark_to_string(wrk->type),
  467. (int) g_hash_table_size(wrk->control_events_pending));
  468. rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
  469. g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
  470. DL_APPEND(res, rep_elt);
  471. }
  472. }
  473. return res;
  474. }
  475. void rspamd_control_broadcast_srv_cmd(struct rspamd_main *rspamd_main,
  476. struct rspamd_control_command *cmd,
  477. pid_t except_pid)
  478. {
  479. rspamd_control_broadcast_cmd(rspamd_main, cmd, -1,
  480. rspamd_control_ignore_io_handler, NULL, except_pid);
  481. }
  482. static int
  483. rspamd_control_finish_handler(struct rspamd_http_connection *conn,
  484. struct rspamd_http_message *msg)
  485. {
  486. struct rspamd_control_session *session = conn->ud;
  487. rspamd_ftok_t srch;
  488. unsigned int i;
  489. gboolean found = FALSE;
  490. struct rspamd_control_reply_elt *cur;
  491. if (!session->is_reply) {
  492. if (msg->url == NULL) {
  493. rspamd_control_connection_close(session);
  494. return 0;
  495. }
  496. srch.begin = msg->url->str;
  497. srch.len = msg->url->len;
  498. session->is_reply = TRUE;
  499. for (i = 0; i < G_N_ELEMENTS(cmd_matches); i++) {
  500. if (rspamd_ftok_casecmp(&srch, &cmd_matches[i].name) == 0) {
  501. session->cmd.type = cmd_matches[i].type;
  502. found = TRUE;
  503. break;
  504. }
  505. }
  506. if (!found) {
  507. rspamd_control_send_error(session, 404, "Command not defined");
  508. }
  509. else {
  510. /* Send command to all workers */
  511. session->replies = rspamd_control_broadcast_cmd(
  512. session->rspamd_main, &session->cmd, -1,
  513. rspamd_control_wrk_io, session, 0);
  514. DL_FOREACH(session->replies, cur)
  515. {
  516. session->replies_remain++;
  517. }
  518. }
  519. }
  520. else {
  521. rspamd_control_connection_close(session);
  522. }
  523. return 0;
  524. }
  525. void rspamd_control_process_client_socket(struct rspamd_main *rspamd_main,
  526. int fd, rspamd_inet_addr_t *addr)
  527. {
  528. struct rspamd_control_session *session;
  529. session = g_malloc0(sizeof(*session));
  530. session->fd = fd;
  531. session->conn = rspamd_http_connection_new_server(rspamd_main->http_ctx,
  532. fd,
  533. NULL,
  534. rspamd_control_error_handler,
  535. rspamd_control_finish_handler,
  536. 0);
  537. session->rspamd_main = rspamd_main;
  538. session->addr = addr;
  539. session->event_loop = rspamd_main->event_loop;
  540. rspamd_http_connection_read_message(session->conn, session,
  541. io_timeout);
  542. }
  543. struct rspamd_worker_control_data {
  544. ev_io io_ev;
  545. struct rspamd_worker *worker;
  546. struct ev_loop *ev_base;
  547. struct {
  548. rspamd_worker_control_handler handler;
  549. gpointer ud;
  550. } handlers[RSPAMD_CONTROL_MAX];
  551. };
  552. static void
  553. rspamd_control_default_cmd_handler(int fd,
  554. int attached_fd,
  555. struct rspamd_worker_control_data *cd,
  556. struct rspamd_control_command *cmd)
  557. {
  558. struct rspamd_control_reply rep;
  559. gssize r;
  560. struct rusage rusg;
  561. struct rspamd_config *cfg;
  562. struct rspamd_main *rspamd_main;
  563. memset(&rep, 0, sizeof(rep));
  564. rep.type = cmd->type;
  565. rspamd_main = cd->worker->srv;
  566. switch (cmd->type) {
  567. case RSPAMD_CONTROL_STAT:
  568. if (getrusage(RUSAGE_SELF, &rusg) == -1) {
  569. msg_err_main("cannot get rusage stats: %s",
  570. strerror(errno));
  571. }
  572. else {
  573. rep.reply.stat.utime = tv_to_double(&rusg.ru_utime);
  574. rep.reply.stat.systime = tv_to_double(&rusg.ru_stime);
  575. rep.reply.stat.maxrss = rusg.ru_maxrss;
  576. }
  577. rep.reply.stat.conns = cd->worker->nconns;
  578. rep.reply.stat.uptime = rspamd_get_calendar_ticks() - cd->worker->start_time;
  579. break;
  580. case RSPAMD_CONTROL_RELOAD:
  581. case RSPAMD_CONTROL_RECOMPILE:
  582. case RSPAMD_CONTROL_HYPERSCAN_LOADED:
  583. case RSPAMD_CONTROL_MONITORED_CHANGE:
  584. case RSPAMD_CONTROL_FUZZY_STAT:
  585. case RSPAMD_CONTROL_FUZZY_SYNC:
  586. case RSPAMD_CONTROL_LOG_PIPE:
  587. case RSPAMD_CONTROL_CHILD_CHANGE:
  588. case RSPAMD_CONTROL_FUZZY_BLOCKED:
  589. break;
  590. case RSPAMD_CONTROL_RERESOLVE:
  591. if (cd->worker->srv->cfg) {
  592. REF_RETAIN(cd->worker->srv->cfg);
  593. cfg = cd->worker->srv->cfg;
  594. if (cfg->ups_ctx) {
  595. msg_info_config("reresolving upstreams");
  596. rspamd_upstream_reresolve(cfg->ups_ctx);
  597. }
  598. rep.reply.reresolve.status = 0;
  599. REF_RELEASE(cfg);
  600. }
  601. else {
  602. rep.reply.reresolve.status = EINVAL;
  603. }
  604. break;
  605. default:
  606. break;
  607. }
  608. r = write(fd, &rep, sizeof(rep));
  609. if (r != sizeof(rep)) {
  610. msg_err_main("cannot write reply to the control socket: %s",
  611. strerror(errno));
  612. }
  613. if (attached_fd != -1) {
  614. close(attached_fd);
  615. }
  616. }
  617. static void
  618. rspamd_control_default_worker_handler(EV_P_ ev_io *w, int revents)
  619. {
  620. struct rspamd_worker_control_data *cd =
  621. (struct rspamd_worker_control_data *) w->data;
  622. static struct rspamd_control_command cmd;
  623. static struct msghdr msg;
  624. static struct iovec iov;
  625. static unsigned char fdspace[CMSG_SPACE(sizeof(int))];
  626. int rfd = -1;
  627. gssize r;
  628. iov.iov_base = &cmd;
  629. iov.iov_len = sizeof(cmd);
  630. memset(&msg, 0, sizeof(msg));
  631. msg.msg_control = fdspace;
  632. msg.msg_controllen = sizeof(fdspace);
  633. msg.msg_iov = &iov;
  634. msg.msg_iovlen = 1;
  635. r = recvmsg(w->fd, &msg, 0);
  636. if (r == -1) {
  637. if (errno != EAGAIN && errno != EINTR) {
  638. if (errno != ECONNRESET) {
  639. /*
  640. * In case of connection reset it means that main process
  641. * has died, so do not pollute logs
  642. */
  643. msg_err("cannot read request from the control socket: %s",
  644. strerror(errno));
  645. }
  646. ev_io_stop(cd->ev_base, &cd->io_ev);
  647. close(w->fd);
  648. }
  649. }
  650. else if (r < (int) sizeof(cmd)) {
  651. msg_err("short read of control command: %d of %d", (int) r,
  652. (int) sizeof(cmd));
  653. if (r == 0) {
  654. ev_io_stop(cd->ev_base, &cd->io_ev);
  655. close(w->fd);
  656. }
  657. }
  658. else if ((int) cmd.type >= 0 && cmd.type < RSPAMD_CONTROL_MAX) {
  659. if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
  660. rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
  661. }
  662. if (cd->handlers[cmd.type].handler) {
  663. cd->handlers[cmd.type].handler(cd->worker->srv,
  664. cd->worker,
  665. w->fd,
  666. rfd,
  667. &cmd,
  668. cd->handlers[cmd.type].ud);
  669. }
  670. else {
  671. rspamd_control_default_cmd_handler(w->fd, rfd, cd, &cmd);
  672. }
  673. }
  674. else {
  675. msg_err("unknown command: %d", (int) cmd.type);
  676. }
  677. }
  678. void rspamd_control_worker_add_default_cmd_handlers(struct rspamd_worker *worker,
  679. struct ev_loop *ev_base)
  680. {
  681. struct rspamd_worker_control_data *cd;
  682. cd = g_malloc0(sizeof(*cd));
  683. cd->worker = worker;
  684. cd->ev_base = ev_base;
  685. cd->io_ev.data = cd;
  686. ev_io_init(&cd->io_ev, rspamd_control_default_worker_handler,
  687. worker->control_pipe[1], EV_READ);
  688. ev_io_start(ev_base, &cd->io_ev);
  689. worker->control_data = cd;
  690. }
  691. /**
  692. * Register custom handler for a specific control command for this worker
  693. */
  694. void rspamd_control_worker_add_cmd_handler(struct rspamd_worker *worker,
  695. enum rspamd_control_type type,
  696. rspamd_worker_control_handler handler,
  697. gpointer ud)
  698. {
  699. struct rspamd_worker_control_data *cd;
  700. g_assert(type >= 0 && type < RSPAMD_CONTROL_MAX);
  701. g_assert(handler != NULL);
  702. g_assert(worker->control_data != NULL);
  703. cd = worker->control_data;
  704. cd->handlers[type].handler = handler;
  705. cd->handlers[type].ud = ud;
  706. }
  707. struct rspamd_srv_reply_data {
  708. struct rspamd_worker *worker;
  709. struct rspamd_main *srv;
  710. int fd;
  711. struct rspamd_srv_reply rep;
  712. };
  713. static void
  714. rspamd_control_ignore_io_handler(int fd, short what, void *ud)
  715. {
  716. struct rspamd_control_reply_elt *elt =
  717. (struct rspamd_control_reply_elt *) ud;
  718. struct rspamd_control_reply rep;
  719. /* At this point we just ignore replies from the workers */
  720. if (read(fd, &rep, sizeof(rep)) == -1) {
  721. msg_debug_control("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
  722. }
  723. rspamd_control_stop_pending(elt);
  724. }
  725. static void
  726. rspamd_control_log_pipe_io_handler(int fd, short what, void *ud)
  727. {
  728. struct rspamd_control_reply_elt *elt =
  729. (struct rspamd_control_reply_elt *) ud;
  730. struct rspamd_control_reply rep;
  731. /* At this point we just ignore replies from the workers */
  732. (void) !read(fd, &rep, sizeof(rep));
  733. rspamd_control_stop_pending(elt);
  734. }
  735. static void
  736. rspamd_control_handle_on_fork(struct rspamd_srv_command *cmd,
  737. struct rspamd_main *srv)
  738. {
  739. struct rspamd_worker *parent, *child;
  740. parent = g_hash_table_lookup(srv->workers,
  741. GSIZE_TO_POINTER(cmd->cmd.on_fork.ppid));
  742. if (parent == NULL) {
  743. msg_err("cannot find parent for a forked process %P (%P child)",
  744. cmd->cmd.on_fork.ppid, cmd->cmd.on_fork.cpid);
  745. return;
  746. }
  747. if (cmd->cmd.on_fork.state == child_dead) {
  748. /* We need to remove stale worker */
  749. child = g_hash_table_lookup(srv->workers,
  750. GSIZE_TO_POINTER(cmd->cmd.on_fork.cpid));
  751. if (child == NULL) {
  752. msg_err("cannot find child for a forked process %P (%P parent)",
  753. cmd->cmd.on_fork.cpid, cmd->cmd.on_fork.ppid);
  754. return;
  755. }
  756. REF_RELEASE(child->cf);
  757. g_hash_table_remove(srv->workers,
  758. GSIZE_TO_POINTER(cmd->cmd.on_fork.cpid));
  759. g_hash_table_unref(child->control_events_pending);
  760. g_free(child);
  761. }
  762. else {
  763. child = g_malloc0(sizeof(struct rspamd_worker));
  764. child->srv = srv;
  765. child->type = parent->type;
  766. child->pid = cmd->cmd.on_fork.cpid;
  767. child->srv_pipe[0] = -1;
  768. child->srv_pipe[1] = -1;
  769. child->control_pipe[0] = -1;
  770. child->control_pipe[1] = -1;
  771. child->cf = parent->cf;
  772. child->ppid = parent->pid;
  773. REF_RETAIN(child->cf);
  774. child->control_events_pending = g_hash_table_new_full(g_direct_hash, g_direct_equal,
  775. NULL, rspamd_pending_control_free);
  776. g_hash_table_insert(srv->workers,
  777. GSIZE_TO_POINTER(cmd->cmd.on_fork.cpid), child);
  778. }
  779. }
  780. static void
  781. rspamd_fill_health_reply(struct rspamd_main *srv, struct rspamd_srv_reply *rep)
  782. {
  783. GHashTableIter it;
  784. gpointer k, v;
  785. memset(&rep->reply.health, 0, sizeof(rep->reply));
  786. g_hash_table_iter_init(&it, srv->workers);
  787. while (g_hash_table_iter_next(&it, &k, &v)) {
  788. struct rspamd_worker *wrk = (struct rspamd_worker *) v;
  789. if (wrk->hb.nbeats < 0) {
  790. rep->reply.health.workers_hb_lost++;
  791. }
  792. else if (rspamd_worker_is_scanner(wrk)) {
  793. rep->reply.health.scanners_count++;
  794. }
  795. rep->reply.health.workers_count++;
  796. }
  797. rep->reply.status = (g_hash_table_size(srv->workers) > 0);
  798. }
  799. static void
  800. rspamd_srv_handler(EV_P_ ev_io *w, int revents)
  801. {
  802. struct rspamd_worker *worker;
  803. static struct rspamd_srv_command cmd;
  804. struct rspamd_main *rspamd_main;
  805. struct rspamd_srv_reply_data *rdata;
  806. struct msghdr msg;
  807. struct cmsghdr *cmsg;
  808. static struct iovec iov;
  809. static unsigned char fdspace[CMSG_SPACE(sizeof(int))];
  810. int *spair, rfd = -1;
  811. char *nid;
  812. struct rspamd_control_command wcmd;
  813. gssize r;
  814. if (revents == EV_READ) {
  815. worker = (struct rspamd_worker *) w->data;
  816. rspamd_main = worker->srv;
  817. iov.iov_base = &cmd;
  818. iov.iov_len = sizeof(cmd);
  819. memset(&msg, 0, sizeof(msg));
  820. msg.msg_control = fdspace;
  821. msg.msg_controllen = sizeof(fdspace);
  822. msg.msg_iov = &iov;
  823. msg.msg_iovlen = 1;
  824. r = recvmsg(w->fd, &msg, 0);
  825. if (r == -1) {
  826. if (errno != EAGAIN) {
  827. msg_err_main("cannot read from worker's srv pipe: %s",
  828. strerror(errno));
  829. }
  830. else {
  831. return;
  832. }
  833. }
  834. else if (r == 0) {
  835. /*
  836. * Usually this means that a worker is dead, so do not try to read
  837. * anything
  838. */
  839. msg_err_main("cannot read from worker's srv pipe connection closed; command = %s",
  840. rspamd_srv_command_to_string(cmd.type));
  841. ev_io_stop(EV_A_ w);
  842. }
  843. else if (r != sizeof(cmd)) {
  844. msg_err_main("cannot read from worker's srv pipe incomplete command: %d != %d; command = %s",
  845. (int) r, (int) sizeof(cmd), rspamd_srv_command_to_string(cmd.type));
  846. }
  847. else {
  848. rdata = g_malloc0(sizeof(*rdata));
  849. rdata->worker = worker;
  850. rdata->srv = rspamd_main;
  851. rdata->rep.id = cmd.id;
  852. rdata->rep.type = cmd.type;
  853. rdata->fd = -1;
  854. worker->tmp_data = rdata;
  855. if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
  856. rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
  857. }
  858. switch (cmd.type) {
  859. case RSPAMD_SRV_SOCKETPAIR:
  860. spair = g_hash_table_lookup(rspamd_main->spairs, cmd.cmd.spair.pair_id);
  861. if (spair == NULL) {
  862. spair = g_malloc(sizeof(int) * 2);
  863. if (rspamd_socketpair(spair, cmd.cmd.spair.af) == -1) {
  864. rdata->rep.reply.spair.code = errno;
  865. msg_err_main("cannot create socket pair: %s", strerror(errno));
  866. }
  867. else {
  868. nid = g_malloc(sizeof(cmd.cmd.spair.pair_id));
  869. memcpy(nid, cmd.cmd.spair.pair_id,
  870. sizeof(cmd.cmd.spair.pair_id));
  871. g_hash_table_insert(rspamd_main->spairs, nid, spair);
  872. rdata->rep.reply.spair.code = 0;
  873. rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
  874. }
  875. }
  876. else {
  877. rdata->rep.reply.spair.code = 0;
  878. rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
  879. }
  880. break;
  881. case RSPAMD_SRV_HYPERSCAN_LOADED:
  882. #ifdef WITH_HYPERSCAN
  883. /* Load RE cache to provide it for new forks */
  884. if (rspamd_re_cache_is_hs_loaded(rspamd_main->cfg->re_cache) != RSPAMD_HYPERSCAN_LOADED_FULL ||
  885. cmd.cmd.hs_loaded.forced) {
  886. rspamd_re_cache_load_hyperscan(
  887. rspamd_main->cfg->re_cache,
  888. cmd.cmd.hs_loaded.cache_dir,
  889. false);
  890. }
  891. /* After getting this notice, we can clean up old hyperscan files */
  892. rspamd_hyperscan_notice_loaded();
  893. msg_info_main("received hyperscan cache loaded from %s",
  894. cmd.cmd.hs_loaded.cache_dir);
  895. /* Broadcast command to all workers */
  896. memset(&wcmd, 0, sizeof(wcmd));
  897. wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
  898. rspamd_strlcpy(wcmd.cmd.hs_loaded.cache_dir,
  899. cmd.cmd.hs_loaded.cache_dir,
  900. sizeof(wcmd.cmd.hs_loaded.cache_dir));
  901. wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced;
  902. rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
  903. rspamd_control_ignore_io_handler, NULL, worker->pid);
  904. #endif
  905. break;
  906. case RSPAMD_SRV_MONITORED_CHANGE:
  907. /* Broadcast command to all workers */
  908. memset(&wcmd, 0, sizeof(wcmd));
  909. wcmd.type = RSPAMD_CONTROL_MONITORED_CHANGE;
  910. rspamd_strlcpy(wcmd.cmd.monitored_change.tag,
  911. cmd.cmd.monitored_change.tag,
  912. sizeof(wcmd.cmd.monitored_change.tag));
  913. wcmd.cmd.monitored_change.alive = cmd.cmd.monitored_change.alive;
  914. wcmd.cmd.monitored_change.sender = cmd.cmd.monitored_change.sender;
  915. rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
  916. rspamd_control_ignore_io_handler, NULL, 0);
  917. break;
  918. case RSPAMD_SRV_LOG_PIPE:
  919. memset(&wcmd, 0, sizeof(wcmd));
  920. wcmd.type = RSPAMD_CONTROL_LOG_PIPE;
  921. wcmd.cmd.log_pipe.type = cmd.cmd.log_pipe.type;
  922. rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
  923. rspamd_control_log_pipe_io_handler, NULL, 0);
  924. break;
  925. case RSPAMD_SRV_ON_FORK:
  926. rdata->rep.reply.on_fork.status = 0;
  927. rspamd_control_handle_on_fork(&cmd, rspamd_main);
  928. break;
  929. case RSPAMD_SRV_HEARTBEAT:
  930. worker->hb.last_event = ev_time();
  931. rdata->rep.reply.heartbeat.status = 0;
  932. break;
  933. case RSPAMD_SRV_HEALTH:
  934. rspamd_fill_health_reply(rspamd_main, &rdata->rep);
  935. break;
  936. case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
  937. #ifdef WITH_HYPERSCAN
  938. rspamd_hyperscan_notice_known(cmd.cmd.hyperscan_cache_file.path);
  939. #endif
  940. rdata->rep.reply.hyperscan_cache_file.unused = 0;
  941. break;
  942. case RSPAMD_SRV_FUZZY_BLOCKED:
  943. /* Broadcast command to all workers */
  944. memset(&wcmd, 0, sizeof(wcmd));
  945. wcmd.type = RSPAMD_CONTROL_FUZZY_BLOCKED;
  946. /* Ensure that memcpy is safe */
  947. G_STATIC_ASSERT(sizeof(wcmd.cmd.fuzzy_blocked) == sizeof(cmd.cmd.fuzzy_blocked));
  948. memcpy(&wcmd.cmd.fuzzy_blocked, &cmd.cmd.fuzzy_blocked, sizeof(wcmd.cmd.fuzzy_blocked));
  949. rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
  950. rspamd_control_ignore_io_handler, NULL, worker->pid);
  951. break;
  952. default:
  953. msg_err_main("unknown command type: %d", cmd.type);
  954. break;
  955. }
  956. if (rfd != -1) {
  957. /* Close our copy to avoid descriptors leak */
  958. close(rfd);
  959. }
  960. /* Now plan write event and send data back */
  961. w->data = rdata;
  962. ev_io_stop(EV_A_ w);
  963. ev_io_set(w, worker->srv_pipe[0], EV_WRITE);
  964. ev_io_start(EV_A_ w);
  965. }
  966. }
  967. else if (revents == EV_WRITE) {
  968. rdata = (struct rspamd_srv_reply_data *) w->data;
  969. worker = rdata->worker;
  970. worker->tmp_data = NULL; /* Avoid race */
  971. rspamd_main = rdata->srv;
  972. memset(&msg, 0, sizeof(msg));
  973. /* Attach fd to the message */
  974. if (rdata->fd != -1) {
  975. memset(fdspace, 0, sizeof(fdspace));
  976. msg.msg_control = fdspace;
  977. msg.msg_controllen = sizeof(fdspace);
  978. cmsg = CMSG_FIRSTHDR(&msg);
  979. cmsg->cmsg_level = SOL_SOCKET;
  980. cmsg->cmsg_type = SCM_RIGHTS;
  981. cmsg->cmsg_len = CMSG_LEN(sizeof(int));
  982. memcpy(CMSG_DATA(cmsg), &rdata->fd, sizeof(int));
  983. }
  984. iov.iov_base = &rdata->rep;
  985. iov.iov_len = sizeof(rdata->rep);
  986. msg.msg_iov = &iov;
  987. msg.msg_iovlen = 1;
  988. r = sendmsg(w->fd, &msg, 0);
  989. if (r == -1) {
  990. msg_err_main("cannot write to worker's srv pipe when writing reply: %s; command = %s",
  991. strerror(errno), rspamd_srv_command_to_string(rdata->rep.type));
  992. }
  993. else if (r != sizeof(rdata->rep)) {
  994. msg_err_main("cannot write to worker's srv pipe: %d != %d; command = %s",
  995. (int) r, (int) sizeof(rdata->rep),
  996. rspamd_srv_command_to_string(rdata->rep.type));
  997. }
  998. g_free(rdata);
  999. w->data = worker;
  1000. ev_io_stop(EV_A_ w);
  1001. ev_io_set(w, worker->srv_pipe[0], EV_READ);
  1002. ev_io_start(EV_A_ w);
  1003. }
  1004. }
  1005. void rspamd_srv_start_watching(struct rspamd_main *srv,
  1006. struct rspamd_worker *worker,
  1007. struct ev_loop *ev_base)
  1008. {
  1009. g_assert(worker != NULL);
  1010. worker->tmp_data = NULL;
  1011. worker->srv_ev.data = worker;
  1012. ev_io_init(&worker->srv_ev, rspamd_srv_handler, worker->srv_pipe[0], EV_READ);
  1013. ev_io_start(ev_base, &worker->srv_ev);
  1014. }
  1015. struct rspamd_srv_request_data {
  1016. struct rspamd_worker *worker;
  1017. struct rspamd_srv_command cmd;
  1018. int attached_fd;
  1019. struct rspamd_srv_reply rep;
  1020. rspamd_srv_reply_handler handler;
  1021. ev_io io_ev;
  1022. gpointer ud;
  1023. };
  1024. static void
  1025. rspamd_srv_request_handler(EV_P_ ev_io *w, int revents)
  1026. {
  1027. struct rspamd_srv_request_data *rd = (struct rspamd_srv_request_data *) w->data;
  1028. struct msghdr msg;
  1029. struct iovec iov;
  1030. unsigned char fdspace[CMSG_SPACE(sizeof(int))];
  1031. struct cmsghdr *cmsg;
  1032. gssize r;
  1033. int rfd = -1;
  1034. if (revents == EV_WRITE) {
  1035. /* Send request to server */
  1036. memset(&msg, 0, sizeof(msg));
  1037. /* Attach fd to the message */
  1038. if (rd->attached_fd != -1) {
  1039. memset(fdspace, 0, sizeof(fdspace));
  1040. msg.msg_control = fdspace;
  1041. msg.msg_controllen = sizeof(fdspace);
  1042. cmsg = CMSG_FIRSTHDR(&msg);
  1043. cmsg->cmsg_level = SOL_SOCKET;
  1044. cmsg->cmsg_type = SCM_RIGHTS;
  1045. cmsg->cmsg_len = CMSG_LEN(sizeof(int));
  1046. memcpy(CMSG_DATA(cmsg), &rd->attached_fd, sizeof(int));
  1047. }
  1048. iov.iov_base = &rd->cmd;
  1049. iov.iov_len = sizeof(rd->cmd);
  1050. msg.msg_iov = &iov;
  1051. msg.msg_iovlen = 1;
  1052. r = sendmsg(w->fd, &msg, 0);
  1053. if (r == -1) {
  1054. if (r == ENOBUFS) {
  1055. /* On BSD derived systems we can have this error when trying to send
  1056. * requests too fast.
  1057. * It might be good to retry...
  1058. */
  1059. msg_info("cannot write to server pipe: %s; command = %s; retrying sending",
  1060. strerror(errno),
  1061. rspamd_srv_command_to_string(rd->cmd.type));
  1062. return;
  1063. }
  1064. msg_err("cannot write to server pipe: %s; command = %s", strerror(errno),
  1065. rspamd_srv_command_to_string(rd->cmd.type));
  1066. goto cleanup;
  1067. }
  1068. else if (r != sizeof(rd->cmd)) {
  1069. msg_err("incomplete write to the server pipe: %d != %d, command = %s",
  1070. (int) r, (int) sizeof(rd->cmd), rspamd_srv_command_to_string(rd->cmd.type));
  1071. goto cleanup;
  1072. }
  1073. ev_io_stop(EV_A_ w);
  1074. ev_io_set(w, rd->worker->srv_pipe[1], EV_READ);
  1075. ev_io_start(EV_A_ w);
  1076. }
  1077. else {
  1078. iov.iov_base = &rd->rep;
  1079. iov.iov_len = sizeof(rd->rep);
  1080. memset(&msg, 0, sizeof(msg));
  1081. msg.msg_control = fdspace;
  1082. msg.msg_controllen = sizeof(fdspace);
  1083. msg.msg_iov = &iov;
  1084. msg.msg_iovlen = 1;
  1085. r = recvmsg(w->fd, &msg, 0);
  1086. if (r == -1) {
  1087. msg_err("cannot read from server pipe: %s; command = %s", strerror(errno),
  1088. rspamd_srv_command_to_string(rd->cmd.type));
  1089. goto cleanup;
  1090. }
  1091. if (r != (int) sizeof(rd->rep)) {
  1092. msg_err("cannot read from server pipe, invalid length: %d != %d; command = %s",
  1093. (int) r, (int) sizeof(rd->rep), rspamd_srv_command_to_string(rd->cmd.type));
  1094. goto cleanup;
  1095. }
  1096. if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
  1097. rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
  1098. }
  1099. /* Reply has been received */
  1100. if (rd->handler) {
  1101. rd->handler(rd->worker, &rd->rep, rfd, rd->ud);
  1102. }
  1103. goto cleanup;
  1104. }
  1105. return;
  1106. cleanup:
  1107. ev_io_stop(EV_A_ w);
  1108. g_free(rd);
  1109. }
  1110. void rspamd_srv_send_command(struct rspamd_worker *worker,
  1111. struct ev_loop *ev_base,
  1112. struct rspamd_srv_command *cmd,
  1113. int attached_fd,
  1114. rspamd_srv_reply_handler handler,
  1115. gpointer ud)
  1116. {
  1117. struct rspamd_srv_request_data *rd;
  1118. g_assert(cmd != NULL);
  1119. g_assert(worker != NULL);
  1120. rd = g_malloc0(sizeof(*rd));
  1121. cmd->id = ottery_rand_uint64();
  1122. memcpy(&rd->cmd, cmd, sizeof(rd->cmd));
  1123. rd->handler = handler;
  1124. rd->ud = ud;
  1125. rd->worker = worker;
  1126. rd->rep.id = cmd->id;
  1127. rd->rep.type = cmd->type;
  1128. rd->attached_fd = attached_fd;
  1129. rd->io_ev.data = rd;
  1130. ev_io_init(&rd->io_ev, rspamd_srv_request_handler,
  1131. rd->worker->srv_pipe[1], EV_WRITE);
  1132. ev_io_start(ev_base, &rd->io_ev);
  1133. }
  1134. enum rspamd_control_type
  1135. rspamd_control_command_from_string(const char *str)
  1136. {
  1137. enum rspamd_control_type ret = RSPAMD_CONTROL_MAX;
  1138. if (!str) {
  1139. return ret;
  1140. }
  1141. if (g_ascii_strcasecmp(str, "hyperscan_loaded") == 0) {
  1142. ret = RSPAMD_CONTROL_HYPERSCAN_LOADED;
  1143. }
  1144. else if (g_ascii_strcasecmp(str, "stat") == 0) {
  1145. ret = RSPAMD_CONTROL_STAT;
  1146. }
  1147. else if (g_ascii_strcasecmp(str, "reload") == 0) {
  1148. ret = RSPAMD_CONTROL_RELOAD;
  1149. }
  1150. else if (g_ascii_strcasecmp(str, "reresolve") == 0) {
  1151. ret = RSPAMD_CONTROL_RERESOLVE;
  1152. }
  1153. else if (g_ascii_strcasecmp(str, "recompile") == 0) {
  1154. ret = RSPAMD_CONTROL_RECOMPILE;
  1155. }
  1156. else if (g_ascii_strcasecmp(str, "log_pipe") == 0) {
  1157. ret = RSPAMD_CONTROL_LOG_PIPE;
  1158. }
  1159. else if (g_ascii_strcasecmp(str, "fuzzy_stat") == 0) {
  1160. ret = RSPAMD_CONTROL_FUZZY_STAT;
  1161. }
  1162. else if (g_ascii_strcasecmp(str, "fuzzy_sync") == 0) {
  1163. ret = RSPAMD_CONTROL_FUZZY_SYNC;
  1164. }
  1165. else if (g_ascii_strcasecmp(str, "monitored_change") == 0) {
  1166. ret = RSPAMD_CONTROL_MONITORED_CHANGE;
  1167. }
  1168. else if (g_ascii_strcasecmp(str, "child_change") == 0) {
  1169. ret = RSPAMD_CONTROL_CHILD_CHANGE;
  1170. }
  1171. return ret;
  1172. }
  1173. const char *
  1174. rspamd_control_command_to_string(enum rspamd_control_type cmd)
  1175. {
  1176. const char *reply = "unknown";
  1177. switch (cmd) {
  1178. case RSPAMD_CONTROL_STAT:
  1179. reply = "stat";
  1180. break;
  1181. case RSPAMD_CONTROL_RELOAD:
  1182. reply = "reload";
  1183. break;
  1184. case RSPAMD_CONTROL_RERESOLVE:
  1185. reply = "reresolve";
  1186. break;
  1187. case RSPAMD_CONTROL_RECOMPILE:
  1188. reply = "recompile";
  1189. break;
  1190. case RSPAMD_CONTROL_HYPERSCAN_LOADED:
  1191. reply = "hyperscan_loaded";
  1192. break;
  1193. case RSPAMD_CONTROL_LOG_PIPE:
  1194. reply = "log_pipe";
  1195. break;
  1196. case RSPAMD_CONTROL_FUZZY_STAT:
  1197. reply = "fuzzy_stat";
  1198. break;
  1199. case RSPAMD_CONTROL_FUZZY_SYNC:
  1200. reply = "fuzzy_sync";
  1201. break;
  1202. case RSPAMD_CONTROL_MONITORED_CHANGE:
  1203. reply = "monitored_change";
  1204. break;
  1205. case RSPAMD_CONTROL_CHILD_CHANGE:
  1206. reply = "child_change";
  1207. break;
  1208. default:
  1209. break;
  1210. }
  1211. return reply;
  1212. }
  1213. const char *rspamd_srv_command_to_string(enum rspamd_srv_type cmd)
  1214. {
  1215. const char *reply = "unknown";
  1216. switch (cmd) {
  1217. case RSPAMD_SRV_SOCKETPAIR:
  1218. reply = "socketpair";
  1219. break;
  1220. case RSPAMD_SRV_HYPERSCAN_LOADED:
  1221. reply = "hyperscan_loaded";
  1222. break;
  1223. case RSPAMD_SRV_MONITORED_CHANGE:
  1224. reply = "monitored_change";
  1225. break;
  1226. case RSPAMD_SRV_LOG_PIPE:
  1227. reply = "log_pipe";
  1228. break;
  1229. case RSPAMD_SRV_ON_FORK:
  1230. reply = "on_fork";
  1231. break;
  1232. case RSPAMD_SRV_HEARTBEAT:
  1233. reply = "heartbeat";
  1234. break;
  1235. case RSPAMD_SRV_HEALTH:
  1236. reply = "health";
  1237. break;
  1238. case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
  1239. reply = "notice_hyperscan_cache";
  1240. break;
  1241. case RSPAMD_SRV_FUZZY_BLOCKED:
  1242. reply = "fuzzy_blocked";
  1243. break;
  1244. }
  1245. return reply;
  1246. }