You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

rspamd_control.c 40KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475
  1. /*
  2. * Copyright 2024 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "rspamd.h"
  18. #include "rspamd_control.h"
  19. #include "worker_util.h"
  20. #include "libserver/http/http_connection.h"
  21. #include "libserver/http/http_private.h"
  22. #include "libutil/libev_helper.h"
  23. #include "unix-std.h"
  24. #include "utlist.h"
  25. #ifdef HAVE_SYS_RESOURCE_H
  26. #include <sys/resource.h>
  27. #endif
  28. #ifdef WITH_HYPERSCAN
  29. #include "hyperscan_tools.h"
  30. #endif
  31. #define msg_debug_control(...) rspamd_conditional_debug_fast(NULL, NULL, \
  32. rspamd_control_log_id, "control", NULL, \
  33. RSPAMD_LOG_FUNC, \
  34. __VA_ARGS__)
  35. static ev_tstamp io_timeout = 30.0;
  36. static ev_tstamp worker_io_timeout = 0.5;
  37. struct rspamd_control_session;
  38. struct rspamd_control_reply_elt {
  39. struct rspamd_control_reply reply;
  40. struct rspamd_io_ev ev;
  41. struct ev_loop *event_loop;
  42. struct rspamd_worker *worker;
  43. GQuark wrk_type;
  44. pid_t wrk_pid;
  45. rspamd_ev_cb handler;
  46. gpointer ud;
  47. int attached_fd;
  48. bool sent;
  49. struct rspamd_control_command cmd;
  50. GHashTable *pending_elts;
  51. struct rspamd_control_reply_elt *prev, *next;
  52. };
  53. struct rspamd_control_session {
  54. int fd;
  55. struct ev_loop *event_loop;
  56. struct rspamd_main *rspamd_main;
  57. struct rspamd_http_connection *conn;
  58. struct rspamd_control_command cmd;
  59. struct rspamd_control_reply_elt *replies;
  60. rspamd_inet_addr_t *addr;
  61. unsigned int replies_remain;
  62. gboolean is_reply;
  63. };
  64. static const struct rspamd_control_cmd_match {
  65. rspamd_ftok_t name;
  66. enum rspamd_control_type type;
  67. } cmd_matches[] = {
  68. {.name = {
  69. .begin = "/stat",
  70. .len = sizeof("/stat") - 1},
  71. .type = RSPAMD_CONTROL_STAT},
  72. {.name = {.begin = "/reload", .len = sizeof("/reload") - 1}, .type = RSPAMD_CONTROL_RELOAD},
  73. {.name = {.begin = "/reresolve", .len = sizeof("/reresolve") - 1}, .type = RSPAMD_CONTROL_RERESOLVE},
  74. {.name = {.begin = "/recompile", .len = sizeof("/recompile") - 1}, .type = RSPAMD_CONTROL_RECOMPILE},
  75. {.name = {.begin = "/fuzzystat", .len = sizeof("/fuzzystat") - 1}, .type = RSPAMD_CONTROL_FUZZY_STAT},
  76. {.name = {.begin = "/fuzzysync", .len = sizeof("/fuzzysync") - 1}, .type = RSPAMD_CONTROL_FUZZY_SYNC},
  77. };
  78. static void rspamd_control_ignore_io_handler(int fd, short what, void *ud);
  79. static void rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt);
  80. INIT_LOG_MODULE(control)
  81. void rspamd_control_send_error(struct rspamd_control_session *session,
  82. int code, const char *error_msg, ...)
  83. {
  84. struct rspamd_http_message *msg;
  85. rspamd_fstring_t *reply;
  86. va_list args;
  87. msg = rspamd_http_new_message(HTTP_RESPONSE);
  88. va_start(args, error_msg);
  89. msg->status = rspamd_fstring_new();
  90. rspamd_vprintf_fstring(&msg->status, error_msg, args);
  91. va_end(args);
  92. msg->date = time(NULL);
  93. msg->code = code;
  94. reply = rspamd_fstring_sized_new(msg->status->len + 16);
  95. rspamd_printf_fstring(&reply, "{\"error\":\"%V\"}", msg->status);
  96. rspamd_http_message_set_body_from_fstring_steal(msg, reply);
  97. rspamd_http_connection_reset(session->conn);
  98. rspamd_http_connection_write_message(session->conn,
  99. msg,
  100. NULL,
  101. "application/json",
  102. session,
  103. io_timeout);
  104. }
  105. static void
  106. rspamd_control_send_ucl(struct rspamd_control_session *session,
  107. ucl_object_t *obj)
  108. {
  109. struct rspamd_http_message *msg;
  110. rspamd_fstring_t *reply;
  111. msg = rspamd_http_new_message(HTTP_RESPONSE);
  112. msg->date = time(NULL);
  113. msg->code = 200;
  114. msg->status = rspamd_fstring_new_init("OK", 2);
  115. reply = rspamd_fstring_sized_new(BUFSIZ);
  116. rspamd_ucl_emit_fstring(obj, UCL_EMIT_JSON_COMPACT, &reply);
  117. rspamd_http_message_set_body_from_fstring_steal(msg, reply);
  118. rspamd_http_connection_reset(session->conn);
  119. rspamd_http_connection_write_message(session->conn,
  120. msg,
  121. NULL,
  122. "application/json",
  123. session,
  124. io_timeout);
  125. }
  126. static void
  127. rspamd_control_connection_close(struct rspamd_control_session *session)
  128. {
  129. struct rspamd_control_reply_elt *elt, *telt;
  130. struct rspamd_main *rspamd_main;
  131. rspamd_main = session->rspamd_main;
  132. msg_info_main("finished connection from %s",
  133. rspamd_inet_address_to_string(session->addr));
  134. DL_FOREACH_SAFE(session->replies, elt, telt)
  135. {
  136. rspamd_control_stop_pending(elt);
  137. }
  138. rspamd_inet_address_free(session->addr);
  139. rspamd_http_connection_unref(session->conn);
  140. close(session->fd);
  141. g_free(session);
  142. }
  143. static void
  144. rspamd_control_write_reply(struct rspamd_control_session *session)
  145. {
  146. ucl_object_t *rep, *cur, *workers;
  147. struct rspamd_control_reply_elt *elt;
  148. char tmpbuf[64];
  149. double total_utime = 0, total_systime = 0;
  150. struct ucl_parser *parser;
  151. unsigned int total_conns = 0;
  152. rep = ucl_object_typed_new(UCL_OBJECT);
  153. workers = ucl_object_typed_new(UCL_OBJECT);
  154. DL_FOREACH(session->replies, elt)
  155. {
  156. /* Skip incompatible worker for fuzzy_stat */
  157. if ((session->cmd.type == RSPAMD_CONTROL_FUZZY_STAT ||
  158. session->cmd.type == RSPAMD_CONTROL_FUZZY_SYNC) &&
  159. elt->wrk_type != g_quark_from_static_string("fuzzy")) {
  160. continue;
  161. }
  162. rspamd_snprintf(tmpbuf, sizeof(tmpbuf), "%P", elt->wrk_pid);
  163. cur = ucl_object_typed_new(UCL_OBJECT);
  164. ucl_object_insert_key(cur, ucl_object_fromstring(g_quark_to_string(elt->wrk_type)), "type", 0, false);
  165. switch (session->cmd.type) {
  166. case RSPAMD_CONTROL_STAT:
  167. ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.stat.conns), "conns", 0, false);
  168. ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.stat.utime), "utime", 0, false);
  169. ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.stat.systime), "systime", 0, false);
  170. ucl_object_insert_key(cur, ucl_object_fromdouble(elt->reply.reply.stat.uptime), "uptime", 0, false);
  171. ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.stat.maxrss), "maxrss", 0, false);
  172. total_utime += elt->reply.reply.stat.utime;
  173. total_systime += elt->reply.reply.stat.systime;
  174. total_conns += elt->reply.reply.stat.conns;
  175. break;
  176. case RSPAMD_CONTROL_RELOAD:
  177. ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.reload.status), "status", 0, false);
  178. break;
  179. case RSPAMD_CONTROL_RECOMPILE:
  180. ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.recompile.status), "status", 0, false);
  181. break;
  182. case RSPAMD_CONTROL_RERESOLVE:
  183. ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.reresolve.status), "status", 0, false);
  184. break;
  185. case RSPAMD_CONTROL_FUZZY_STAT:
  186. if (elt->attached_fd != -1) {
  187. /* We have some data to parse */
  188. parser = ucl_parser_new(0);
  189. ucl_object_insert_key(cur,
  190. ucl_object_fromint(
  191. elt->reply.reply.fuzzy_stat.status),
  192. "status",
  193. 0,
  194. false);
  195. if (ucl_parser_add_fd(parser, elt->attached_fd)) {
  196. ucl_object_insert_key(cur, ucl_parser_get_object(parser),
  197. "data", 0, false);
  198. ucl_parser_free(parser);
  199. }
  200. else {
  201. ucl_object_insert_key(cur, ucl_object_fromstring(ucl_parser_get_error(parser)), "error", 0, false);
  202. ucl_parser_free(parser);
  203. }
  204. ucl_object_insert_key(cur,
  205. ucl_object_fromlstring(
  206. elt->reply.reply.fuzzy_stat.storage_id,
  207. MEMPOOL_UID_LEN - 1),
  208. "id",
  209. 0,
  210. false);
  211. }
  212. else {
  213. ucl_object_insert_key(cur,
  214. ucl_object_fromstring("missing file"),
  215. "error",
  216. 0,
  217. false);
  218. ucl_object_insert_key(cur,
  219. ucl_object_fromint(
  220. elt->reply.reply.fuzzy_stat.status),
  221. "status",
  222. 0,
  223. false);
  224. }
  225. break;
  226. case RSPAMD_CONTROL_FUZZY_SYNC:
  227. ucl_object_insert_key(cur, ucl_object_fromint(elt->reply.reply.fuzzy_sync.status), "status", 0, false);
  228. break;
  229. default:
  230. break;
  231. }
  232. if (elt->attached_fd != -1) {
  233. close(elt->attached_fd);
  234. elt->attached_fd = -1;
  235. }
  236. ucl_object_insert_key(workers, cur, tmpbuf, 0, true);
  237. }
  238. ucl_object_insert_key(rep, workers, "workers", 0, false);
  239. if (session->cmd.type == RSPAMD_CONTROL_STAT) {
  240. /* Total stats */
  241. cur = ucl_object_typed_new(UCL_OBJECT);
  242. ucl_object_insert_key(cur, ucl_object_fromint(total_conns), "conns", 0, false);
  243. ucl_object_insert_key(cur, ucl_object_fromdouble(total_utime), "utime", 0, false);
  244. ucl_object_insert_key(cur, ucl_object_fromdouble(total_systime), "systime", 0, false);
  245. ucl_object_insert_key(rep, cur, "total", 0, false);
  246. }
  247. rspamd_control_send_ucl(session, rep);
  248. ucl_object_unref(rep);
  249. }
  250. static void
  251. rspamd_control_wrk_io(int fd, short what, gpointer ud)
  252. {
  253. struct rspamd_control_reply_elt *elt = ud;
  254. struct rspamd_control_session *session;
  255. unsigned char fdspace[CMSG_SPACE(sizeof(int))];
  256. struct iovec iov;
  257. struct msghdr msg;
  258. gssize r;
  259. session = elt->ud;
  260. elt->attached_fd = -1;
  261. if (what == EV_READ) {
  262. iov.iov_base = &elt->reply;
  263. iov.iov_len = sizeof(elt->reply);
  264. memset(&msg, 0, sizeof(msg));
  265. msg.msg_control = fdspace;
  266. msg.msg_controllen = sizeof(fdspace);
  267. msg.msg_iov = &iov;
  268. msg.msg_iovlen = 1;
  269. r = recvmsg(fd, &msg, 0);
  270. if (r == -1) {
  271. msg_err("cannot read reply from the worker %P (%s): %s",
  272. elt->wrk_pid, g_quark_to_string(elt->wrk_type),
  273. strerror(errno));
  274. }
  275. else if (r >= (gssize) sizeof(elt->reply)) {
  276. if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
  277. elt->attached_fd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
  278. }
  279. }
  280. }
  281. else {
  282. /* Timeout waiting */
  283. msg_warn("timeout waiting reply from %P (%s)",
  284. elt->wrk_pid, g_quark_to_string(elt->wrk_type));
  285. }
  286. session->replies_remain--;
  287. rspamd_ev_watcher_stop(session->event_loop,
  288. &elt->ev);
  289. if (session->replies_remain == 0) {
  290. rspamd_control_write_reply(session);
  291. }
  292. }
  293. static void
  294. rspamd_control_error_handler(struct rspamd_http_connection *conn, GError *err)
  295. {
  296. struct rspamd_control_session *session = conn->ud;
  297. struct rspamd_main *rspamd_main;
  298. rspamd_main = session->rspamd_main;
  299. if (!session->is_reply) {
  300. msg_info_main("abnormally closing control connection: %e", err);
  301. session->is_reply = TRUE;
  302. rspamd_control_send_error(session, err->code, "%s", err->message);
  303. }
  304. else {
  305. rspamd_control_connection_close(session);
  306. }
  307. }
  308. void rspamd_pending_control_free(gpointer p)
  309. {
  310. struct rspamd_control_reply_elt *rep_elt = (struct rspamd_control_reply_elt *) p;
  311. if (rep_elt->sent) {
  312. rspamd_ev_watcher_stop(rep_elt->event_loop, &rep_elt->ev);
  313. }
  314. else if (rep_elt->attached_fd != -1) {
  315. /* Only for non-sent requests! */
  316. close(rep_elt->attached_fd);
  317. }
  318. g_hash_table_unref(rep_elt->pending_elts);
  319. g_free(rep_elt);
  320. }
  321. static inline void
  322. rspamd_control_fill_msghdr(struct rspamd_control_command *cmd,
  323. int attached_fd, struct msghdr *msg,
  324. struct iovec *iov)
  325. {
  326. struct cmsghdr *cmsg;
  327. unsigned char fdspace[CMSG_SPACE(sizeof(int))];
  328. memset(msg, 0, sizeof(*msg));
  329. /* Attach fd to the message */
  330. if (attached_fd != -1) {
  331. memset(fdspace, 0, sizeof(fdspace));
  332. msg->msg_control = fdspace;
  333. msg->msg_controllen = sizeof(fdspace);
  334. cmsg = CMSG_FIRSTHDR(msg);
  335. cmsg->cmsg_level = SOL_SOCKET;
  336. cmsg->cmsg_type = SCM_RIGHTS;
  337. cmsg->cmsg_len = CMSG_LEN(sizeof(int));
  338. memcpy(CMSG_DATA(cmsg), &attached_fd, sizeof(int));
  339. }
  340. iov->iov_base = cmd;
  341. iov->iov_len = sizeof(*cmd);
  342. msg->msg_iov = iov;
  343. msg->msg_iovlen = 1;
  344. }
  345. static void
  346. rspamd_control_stop_pending(struct rspamd_control_reply_elt *elt)
  347. {
  348. GHashTable *htb;
  349. struct rspamd_main *rspamd_main;
  350. gsize pending;
  351. /* It stops event and frees hash */
  352. htb = elt->pending_elts;
  353. pending = g_hash_table_size(htb);
  354. msg_debug_control("stop pending for %P(%s), %d events pending", elt->wrk_pid,
  355. g_quark_to_string(elt->wrk_type),
  356. (int) pending);
  357. if (elt->worker->state != rspamd_worker_state_terminating && pending != 0) {
  358. /* Invoke another event from the queue */
  359. GHashTableIter it;
  360. gpointer k, v;
  361. g_hash_table_iter_init(&it, elt->pending_elts);
  362. while (g_hash_table_iter_next(&it, &k, &v)) {
  363. struct rspamd_control_reply_elt *cur = v;
  364. if (!cur->sent) {
  365. struct msghdr msg;
  366. struct iovec iov;
  367. rspamd_main = cur->worker->srv;
  368. rspamd_control_fill_msghdr(&cur->cmd, cur->attached_fd, &msg, &iov);
  369. ssize_t r = sendmsg(cur->worker->control_pipe[0], &msg, 0);
  370. if (r == sizeof(cur->cmd)) {
  371. msg_debug_control("restarting pending event for %P(%s), %d events pending",
  372. cur->wrk_pid,
  373. g_quark_to_string(cur->wrk_type),
  374. (int) pending - 1);
  375. rspamd_ev_watcher_init(&cur->ev,
  376. cur->worker->control_pipe[0],
  377. EV_READ, cur->handler,
  378. cur);
  379. rspamd_ev_watcher_start(cur->event_loop,
  380. &cur->ev, worker_io_timeout);
  381. cur->sent = true;
  382. if (cur->attached_fd != -1) {
  383. /* Since `sendmsg` performs `dup` for us, we need to remove our own descriptor */
  384. close(cur->attached_fd);
  385. cur->attached_fd = -1;
  386. }
  387. break; /* Exit the outer loop as we have invoked something */
  388. }
  389. else {
  390. msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
  391. (int) cur->cmd.type,
  392. cur->wrk_pid,
  393. g_quark_to_string(cur->wrk_type),
  394. cur->worker->control_pipe[0],
  395. strerror(errno));
  396. g_hash_table_remove(elt->pending_elts, cur);
  397. }
  398. }
  399. }
  400. }
  401. /* Remove from hash and performs the cleanup */
  402. g_hash_table_remove(elt->pending_elts, elt);
  403. }
  404. static struct rspamd_control_reply_elt *
  405. rspamd_control_broadcast_cmd(struct rspamd_main *rspamd_main,
  406. struct rspamd_control_command *cmd,
  407. int attached_fd,
  408. rspamd_ev_cb handler,
  409. gpointer ud,
  410. pid_t except_pid)
  411. {
  412. GHashTableIter it;
  413. struct rspamd_worker *wrk;
  414. struct rspamd_control_reply_elt *rep_elt, *res = NULL;
  415. gpointer k, v;
  416. gssize r;
  417. g_hash_table_iter_init(&it, rspamd_main->workers);
  418. while (g_hash_table_iter_next(&it, &k, &v)) {
  419. wrk = v;
  420. /* No control pipe */
  421. if (wrk->control_pipe[0] == -1) {
  422. continue;
  423. }
  424. if (except_pid != 0 && wrk->pid == except_pid) {
  425. continue;
  426. }
  427. /* Worker is terminating, do not bother sending stuff */
  428. if (wrk->state == rspamd_worker_state_terminating) {
  429. continue;
  430. }
  431. rep_elt = g_malloc0(sizeof(*rep_elt));
  432. rep_elt->worker = wrk;
  433. rep_elt->wrk_pid = wrk->pid;
  434. rep_elt->wrk_type = wrk->type;
  435. rep_elt->event_loop = rspamd_main->event_loop;
  436. rep_elt->ud = ud;
  437. rep_elt->handler = handler;
  438. memcpy(&rep_elt->cmd, cmd, sizeof(*cmd));
  439. rep_elt->sent = false;
  440. rep_elt->attached_fd = -1;
  441. if (g_hash_table_size(wrk->control_events_pending) == 0) {
  442. /* We can send command */
  443. struct msghdr msg;
  444. struct iovec iov;
  445. rspamd_control_fill_msghdr(cmd, attached_fd, &msg, &iov);
  446. r = sendmsg(wrk->control_pipe[0], &msg, 0);
  447. if (r == sizeof(*cmd)) {
  448. rspamd_ev_watcher_init(&rep_elt->ev,
  449. wrk->control_pipe[0],
  450. EV_READ, handler,
  451. rep_elt);
  452. rspamd_ev_watcher_start(rspamd_main->event_loop,
  453. &rep_elt->ev, worker_io_timeout);
  454. rep_elt->sent = true;
  455. rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
  456. g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
  457. DL_APPEND(res, rep_elt);
  458. msg_debug_control("sent command %d to the worker %P(%s), fd: %d",
  459. (int) cmd->type,
  460. wrk->pid,
  461. g_quark_to_string(wrk->type),
  462. wrk->control_pipe[0]);
  463. }
  464. else {
  465. msg_err_main("cannot write command %d to the worker %P(%s), fd: %d: %s",
  466. (int) cmd->type,
  467. wrk->pid,
  468. g_quark_to_string(wrk->type),
  469. wrk->control_pipe[0],
  470. strerror(errno));
  471. g_free(rep_elt);
  472. }
  473. }
  474. else {
  475. /* We need to wait till the last command is processed, or it will mess up all serialization */
  476. msg_debug_control("pending event for %P(%s), %d events pending",
  477. wrk->pid,
  478. g_quark_to_string(wrk->type),
  479. (int) g_hash_table_size(wrk->control_events_pending));
  480. rep_elt->pending_elts = g_hash_table_ref(wrk->control_events_pending);
  481. /*
  482. * Here are dragons:
  483. * If we have a descriptor to send, the callee expects that we follow
  484. * sendmsg semantics that performs `dup` on it. So we need to clone fd and keep it there.
  485. */
  486. if (attached_fd != -1) {
  487. rep_elt->attached_fd = dup(attached_fd);
  488. if (rep_elt->attached_fd == -1) {
  489. /*
  490. * We have a problem: file descriptors limit is reached, so we cannot really deal with this
  491. * request
  492. */
  493. msg_err_main("cannot duplicate file descriptor to send command to worker %P(%s): %s; failed to send command",
  494. wrk->pid,
  495. g_quark_to_string(wrk->type),
  496. strerror(errno));
  497. g_hash_table_unref(rep_elt->pending_elts);
  498. g_free(rep_elt);
  499. }
  500. else {
  501. g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
  502. DL_APPEND(res, rep_elt);
  503. }
  504. }
  505. else {
  506. g_hash_table_insert(wrk->control_events_pending, rep_elt, rep_elt);
  507. DL_APPEND(res, rep_elt);
  508. }
  509. }
  510. }
  511. return res;
  512. }
  513. void rspamd_control_broadcast_srv_cmd(struct rspamd_main *rspamd_main,
  514. struct rspamd_control_command *cmd,
  515. pid_t except_pid)
  516. {
  517. rspamd_control_broadcast_cmd(rspamd_main, cmd, -1,
  518. rspamd_control_ignore_io_handler, NULL, except_pid);
  519. }
  520. static int
  521. rspamd_control_finish_handler(struct rspamd_http_connection *conn,
  522. struct rspamd_http_message *msg)
  523. {
  524. struct rspamd_control_session *session = conn->ud;
  525. rspamd_ftok_t srch;
  526. unsigned int i;
  527. gboolean found = FALSE;
  528. struct rspamd_control_reply_elt *cur;
  529. if (!session->is_reply) {
  530. if (msg->url == NULL) {
  531. rspamd_control_connection_close(session);
  532. return 0;
  533. }
  534. srch.begin = msg->url->str;
  535. srch.len = msg->url->len;
  536. session->is_reply = TRUE;
  537. for (i = 0; i < G_N_ELEMENTS(cmd_matches); i++) {
  538. if (rspamd_ftok_casecmp(&srch, &cmd_matches[i].name) == 0) {
  539. session->cmd.type = cmd_matches[i].type;
  540. found = TRUE;
  541. break;
  542. }
  543. }
  544. if (!found) {
  545. rspamd_control_send_error(session, 404, "Command not defined");
  546. }
  547. else {
  548. /* Send command to all workers */
  549. session->replies = rspamd_control_broadcast_cmd(
  550. session->rspamd_main, &session->cmd, -1,
  551. rspamd_control_wrk_io, session, 0);
  552. DL_FOREACH(session->replies, cur)
  553. {
  554. session->replies_remain++;
  555. }
  556. }
  557. }
  558. else {
  559. rspamd_control_connection_close(session);
  560. }
  561. return 0;
  562. }
  563. void rspamd_control_process_client_socket(struct rspamd_main *rspamd_main,
  564. int fd, rspamd_inet_addr_t *addr)
  565. {
  566. struct rspamd_control_session *session;
  567. session = g_malloc0(sizeof(*session));
  568. session->fd = fd;
  569. session->conn = rspamd_http_connection_new_server(rspamd_main->http_ctx,
  570. fd,
  571. NULL,
  572. rspamd_control_error_handler,
  573. rspamd_control_finish_handler,
  574. 0);
  575. session->rspamd_main = rspamd_main;
  576. session->addr = addr;
  577. session->event_loop = rspamd_main->event_loop;
  578. rspamd_http_connection_read_message(session->conn, session,
  579. io_timeout);
  580. }
  581. struct rspamd_worker_control_data {
  582. ev_io io_ev;
  583. struct rspamd_worker *worker;
  584. struct ev_loop *ev_base;
  585. struct {
  586. rspamd_worker_control_handler handler;
  587. gpointer ud;
  588. } handlers[RSPAMD_CONTROL_MAX];
  589. };
  590. static void
  591. rspamd_control_default_cmd_handler(int fd,
  592. int attached_fd,
  593. struct rspamd_worker_control_data *cd,
  594. struct rspamd_control_command *cmd)
  595. {
  596. struct rspamd_control_reply rep;
  597. gssize r;
  598. struct rusage rusg;
  599. struct rspamd_config *cfg;
  600. struct rspamd_main *rspamd_main;
  601. memset(&rep, 0, sizeof(rep));
  602. rep.type = cmd->type;
  603. rspamd_main = cd->worker->srv;
  604. switch (cmd->type) {
  605. case RSPAMD_CONTROL_STAT:
  606. if (getrusage(RUSAGE_SELF, &rusg) == -1) {
  607. msg_err_main("cannot get rusage stats: %s",
  608. strerror(errno));
  609. }
  610. else {
  611. rep.reply.stat.utime = tv_to_double(&rusg.ru_utime);
  612. rep.reply.stat.systime = tv_to_double(&rusg.ru_stime);
  613. rep.reply.stat.maxrss = rusg.ru_maxrss;
  614. }
  615. rep.reply.stat.conns = cd->worker->nconns;
  616. rep.reply.stat.uptime = rspamd_get_calendar_ticks() - cd->worker->start_time;
  617. break;
  618. case RSPAMD_CONTROL_RELOAD:
  619. case RSPAMD_CONTROL_RECOMPILE:
  620. case RSPAMD_CONTROL_HYPERSCAN_LOADED:
  621. case RSPAMD_CONTROL_MONITORED_CHANGE:
  622. case RSPAMD_CONTROL_FUZZY_STAT:
  623. case RSPAMD_CONTROL_FUZZY_SYNC:
  624. case RSPAMD_CONTROL_LOG_PIPE:
  625. case RSPAMD_CONTROL_CHILD_CHANGE:
  626. case RSPAMD_CONTROL_FUZZY_BLOCKED:
  627. break;
  628. case RSPAMD_CONTROL_RERESOLVE:
  629. if (cd->worker->srv->cfg) {
  630. REF_RETAIN(cd->worker->srv->cfg);
  631. cfg = cd->worker->srv->cfg;
  632. if (cfg->ups_ctx) {
  633. msg_info_config("reresolving upstreams");
  634. rspamd_upstream_reresolve(cfg->ups_ctx);
  635. }
  636. rep.reply.reresolve.status = 0;
  637. REF_RELEASE(cfg);
  638. }
  639. else {
  640. rep.reply.reresolve.status = EINVAL;
  641. }
  642. break;
  643. default:
  644. break;
  645. }
  646. r = write(fd, &rep, sizeof(rep));
  647. if (r != sizeof(rep)) {
  648. msg_err_main("cannot write reply to the control socket: %s",
  649. strerror(errno));
  650. }
  651. if (attached_fd != -1) {
  652. close(attached_fd);
  653. }
  654. }
  655. static void
  656. rspamd_control_default_worker_handler(EV_P_ ev_io *w, int revents)
  657. {
  658. struct rspamd_worker_control_data *cd =
  659. (struct rspamd_worker_control_data *) w->data;
  660. static struct rspamd_control_command cmd;
  661. static struct msghdr msg;
  662. static struct iovec iov;
  663. static unsigned char fdspace[CMSG_SPACE(sizeof(int))];
  664. int rfd = -1;
  665. gssize r;
  666. iov.iov_base = &cmd;
  667. iov.iov_len = sizeof(cmd);
  668. memset(&msg, 0, sizeof(msg));
  669. msg.msg_control = fdspace;
  670. msg.msg_controllen = sizeof(fdspace);
  671. msg.msg_iov = &iov;
  672. msg.msg_iovlen = 1;
  673. r = recvmsg(w->fd, &msg, 0);
  674. if (r == -1) {
  675. if (errno != EAGAIN && errno != EINTR) {
  676. if (errno != ECONNRESET) {
  677. /*
  678. * In case of connection reset it means that main process
  679. * has died, so do not pollute logs
  680. */
  681. msg_err("cannot read request from the control socket: %s",
  682. strerror(errno));
  683. }
  684. ev_io_stop(cd->ev_base, &cd->io_ev);
  685. close(w->fd);
  686. }
  687. }
  688. else if (r < (int) sizeof(cmd)) {
  689. msg_err("short read of control command: %d of %d", (int) r,
  690. (int) sizeof(cmd));
  691. if (r == 0) {
  692. ev_io_stop(cd->ev_base, &cd->io_ev);
  693. close(w->fd);
  694. }
  695. }
  696. else if ((int) cmd.type >= 0 && cmd.type < RSPAMD_CONTROL_MAX) {
  697. if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
  698. rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
  699. }
  700. if (cd->handlers[cmd.type].handler) {
  701. cd->handlers[cmd.type].handler(cd->worker->srv,
  702. cd->worker,
  703. w->fd,
  704. rfd,
  705. &cmd,
  706. cd->handlers[cmd.type].ud);
  707. }
  708. else {
  709. rspamd_control_default_cmd_handler(w->fd, rfd, cd, &cmd);
  710. }
  711. }
  712. else {
  713. msg_err("unknown command: %d", (int) cmd.type);
  714. }
  715. }
  716. void rspamd_control_worker_add_default_cmd_handlers(struct rspamd_worker *worker,
  717. struct ev_loop *ev_base)
  718. {
  719. struct rspamd_worker_control_data *cd;
  720. cd = g_malloc0(sizeof(*cd));
  721. cd->worker = worker;
  722. cd->ev_base = ev_base;
  723. cd->io_ev.data = cd;
  724. ev_io_init(&cd->io_ev, rspamd_control_default_worker_handler,
  725. worker->control_pipe[1], EV_READ);
  726. ev_io_start(ev_base, &cd->io_ev);
  727. worker->control_data = cd;
  728. }
  729. /**
  730. * Register custom handler for a specific control command for this worker
  731. */
  732. void rspamd_control_worker_add_cmd_handler(struct rspamd_worker *worker,
  733. enum rspamd_control_type type,
  734. rspamd_worker_control_handler handler,
  735. gpointer ud)
  736. {
  737. struct rspamd_worker_control_data *cd;
  738. g_assert(type >= 0 && type < RSPAMD_CONTROL_MAX);
  739. g_assert(handler != NULL);
  740. g_assert(worker->control_data != NULL);
  741. cd = worker->control_data;
  742. cd->handlers[type].handler = handler;
  743. cd->handlers[type].ud = ud;
  744. }
  745. struct rspamd_srv_reply_data {
  746. struct rspamd_worker *worker;
  747. struct rspamd_main *srv;
  748. int fd;
  749. struct rspamd_srv_reply rep;
  750. };
  751. static void
  752. rspamd_control_ignore_io_handler(int fd, short what, void *ud)
  753. {
  754. struct rspamd_control_reply_elt *elt =
  755. (struct rspamd_control_reply_elt *) ud;
  756. struct rspamd_control_reply rep;
  757. /* At this point we just ignore replies from the workers */
  758. if (read(fd, &rep, sizeof(rep)) == -1) {
  759. msg_debug_control("cannot read %d bytes: %s", (int) sizeof(rep), strerror(errno));
  760. }
  761. rspamd_control_stop_pending(elt);
  762. }
  763. static void
  764. rspamd_control_log_pipe_io_handler(int fd, short what, void *ud)
  765. {
  766. struct rspamd_control_reply_elt *elt =
  767. (struct rspamd_control_reply_elt *) ud;
  768. struct rspamd_control_reply rep;
  769. /* At this point we just ignore replies from the workers */
  770. (void) !read(fd, &rep, sizeof(rep));
  771. rspamd_control_stop_pending(elt);
  772. }
  773. static void
  774. rspamd_control_handle_on_fork(struct rspamd_srv_command *cmd,
  775. struct rspamd_main *srv)
  776. {
  777. struct rspamd_worker *parent, *child;
  778. parent = g_hash_table_lookup(srv->workers,
  779. GSIZE_TO_POINTER(cmd->cmd.on_fork.ppid));
  780. if (parent == NULL) {
  781. msg_err("cannot find parent for a forked process %P (%P child)",
  782. cmd->cmd.on_fork.ppid, cmd->cmd.on_fork.cpid);
  783. return;
  784. }
  785. if (cmd->cmd.on_fork.state == child_dead) {
  786. /* We need to remove stale worker */
  787. child = g_hash_table_lookup(srv->workers,
  788. GSIZE_TO_POINTER(cmd->cmd.on_fork.cpid));
  789. if (child == NULL) {
  790. msg_err("cannot find child for a forked process %P (%P parent)",
  791. cmd->cmd.on_fork.cpid, cmd->cmd.on_fork.ppid);
  792. return;
  793. }
  794. REF_RELEASE(child->cf);
  795. g_hash_table_remove(srv->workers,
  796. GSIZE_TO_POINTER(cmd->cmd.on_fork.cpid));
  797. g_hash_table_unref(child->control_events_pending);
  798. g_free(child);
  799. }
  800. else {
  801. child = g_malloc0(sizeof(struct rspamd_worker));
  802. child->srv = srv;
  803. child->type = parent->type;
  804. child->pid = cmd->cmd.on_fork.cpid;
  805. child->srv_pipe[0] = -1;
  806. child->srv_pipe[1] = -1;
  807. child->control_pipe[0] = -1;
  808. child->control_pipe[1] = -1;
  809. child->cf = parent->cf;
  810. child->ppid = parent->pid;
  811. REF_RETAIN(child->cf);
  812. child->control_events_pending = g_hash_table_new_full(g_direct_hash, g_direct_equal,
  813. NULL, rspamd_pending_control_free);
  814. g_hash_table_insert(srv->workers,
  815. GSIZE_TO_POINTER(cmd->cmd.on_fork.cpid), child);
  816. }
  817. }
  818. static void
  819. rspamd_fill_health_reply(struct rspamd_main *srv, struct rspamd_srv_reply *rep)
  820. {
  821. GHashTableIter it;
  822. gpointer k, v;
  823. memset(&rep->reply.health, 0, sizeof(rep->reply));
  824. g_hash_table_iter_init(&it, srv->workers);
  825. while (g_hash_table_iter_next(&it, &k, &v)) {
  826. struct rspamd_worker *wrk = (struct rspamd_worker *) v;
  827. if (wrk->hb.nbeats < 0) {
  828. rep->reply.health.workers_hb_lost++;
  829. }
  830. else if (rspamd_worker_is_scanner(wrk)) {
  831. rep->reply.health.scanners_count++;
  832. }
  833. rep->reply.health.workers_count++;
  834. }
  835. rep->reply.status = (g_hash_table_size(srv->workers) > 0);
  836. }
  837. static void
  838. rspamd_srv_handler(EV_P_ ev_io *w, int revents)
  839. {
  840. struct rspamd_worker *worker;
  841. static struct rspamd_srv_command cmd;
  842. struct rspamd_main *rspamd_main;
  843. struct rspamd_srv_reply_data *rdata;
  844. struct msghdr msg;
  845. struct cmsghdr *cmsg;
  846. static struct iovec iov;
  847. static unsigned char fdspace[CMSG_SPACE(sizeof(int))];
  848. int *spair, rfd = -1;
  849. char *nid;
  850. struct rspamd_control_command wcmd;
  851. gssize r;
  852. if (revents == EV_READ) {
  853. worker = (struct rspamd_worker *) w->data;
  854. rspamd_main = worker->srv;
  855. iov.iov_base = &cmd;
  856. iov.iov_len = sizeof(cmd);
  857. memset(&msg, 0, sizeof(msg));
  858. msg.msg_control = fdspace;
  859. msg.msg_controllen = sizeof(fdspace);
  860. msg.msg_iov = &iov;
  861. msg.msg_iovlen = 1;
  862. r = recvmsg(w->fd, &msg, 0);
  863. if (r == -1) {
  864. if (errno != EAGAIN) {
  865. msg_err_main("cannot read from worker's srv pipe: %s",
  866. strerror(errno));
  867. }
  868. else {
  869. return;
  870. }
  871. }
  872. else if (r == 0) {
  873. /*
  874. * Usually this means that a worker is dead, so do not try to read
  875. * anything
  876. */
  877. msg_err_main("cannot read from worker's srv pipe connection closed; command = %s",
  878. rspamd_srv_command_to_string(cmd.type));
  879. ev_io_stop(EV_A_ w);
  880. }
  881. else if (r != sizeof(cmd)) {
  882. msg_err_main("cannot read from worker's srv pipe incomplete command: %d != %d; command = %s",
  883. (int) r, (int) sizeof(cmd), rspamd_srv_command_to_string(cmd.type));
  884. }
  885. else {
  886. rdata = g_malloc0(sizeof(*rdata));
  887. rdata->worker = worker;
  888. rdata->srv = rspamd_main;
  889. rdata->rep.id = cmd.id;
  890. rdata->rep.type = cmd.type;
  891. rdata->fd = -1;
  892. worker->tmp_data = rdata;
  893. if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
  894. rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
  895. }
  896. switch (cmd.type) {
  897. case RSPAMD_SRV_SOCKETPAIR:
  898. spair = g_hash_table_lookup(rspamd_main->spairs, cmd.cmd.spair.pair_id);
  899. if (spair == NULL) {
  900. spair = g_malloc(sizeof(int) * 2);
  901. if (rspamd_socketpair(spair, cmd.cmd.spair.af) == -1) {
  902. rdata->rep.reply.spair.code = errno;
  903. msg_err_main("cannot create socket pair: %s", strerror(errno));
  904. }
  905. else {
  906. nid = g_malloc(sizeof(cmd.cmd.spair.pair_id));
  907. memcpy(nid, cmd.cmd.spair.pair_id,
  908. sizeof(cmd.cmd.spair.pair_id));
  909. g_hash_table_insert(rspamd_main->spairs, nid, spair);
  910. rdata->rep.reply.spair.code = 0;
  911. rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
  912. }
  913. }
  914. else {
  915. rdata->rep.reply.spair.code = 0;
  916. rdata->fd = cmd.cmd.spair.pair_num ? spair[1] : spair[0];
  917. }
  918. break;
  919. case RSPAMD_SRV_HYPERSCAN_LOADED:
  920. #ifdef WITH_HYPERSCAN
  921. /* Load RE cache to provide it for new forks */
  922. if (rspamd_re_cache_is_hs_loaded(rspamd_main->cfg->re_cache) != RSPAMD_HYPERSCAN_LOADED_FULL ||
  923. cmd.cmd.hs_loaded.forced) {
  924. rspamd_re_cache_load_hyperscan(
  925. rspamd_main->cfg->re_cache,
  926. cmd.cmd.hs_loaded.cache_dir,
  927. false);
  928. }
  929. /* After getting this notice, we can clean up old hyperscan files */
  930. rspamd_hyperscan_notice_loaded();
  931. msg_info_main("received hyperscan cache loaded from %s",
  932. cmd.cmd.hs_loaded.cache_dir);
  933. /* Broadcast command to all workers */
  934. memset(&wcmd, 0, sizeof(wcmd));
  935. wcmd.type = RSPAMD_CONTROL_HYPERSCAN_LOADED;
  936. rspamd_strlcpy(wcmd.cmd.hs_loaded.cache_dir,
  937. cmd.cmd.hs_loaded.cache_dir,
  938. sizeof(wcmd.cmd.hs_loaded.cache_dir));
  939. wcmd.cmd.hs_loaded.forced = cmd.cmd.hs_loaded.forced;
  940. rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
  941. rspamd_control_ignore_io_handler, NULL, worker->pid);
  942. #endif
  943. break;
  944. case RSPAMD_SRV_MONITORED_CHANGE:
  945. /* Broadcast command to all workers */
  946. memset(&wcmd, 0, sizeof(wcmd));
  947. wcmd.type = RSPAMD_CONTROL_MONITORED_CHANGE;
  948. rspamd_strlcpy(wcmd.cmd.monitored_change.tag,
  949. cmd.cmd.monitored_change.tag,
  950. sizeof(wcmd.cmd.monitored_change.tag));
  951. wcmd.cmd.monitored_change.alive = cmd.cmd.monitored_change.alive;
  952. wcmd.cmd.monitored_change.sender = cmd.cmd.monitored_change.sender;
  953. rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
  954. rspamd_control_ignore_io_handler, NULL, 0);
  955. break;
  956. case RSPAMD_SRV_LOG_PIPE:
  957. memset(&wcmd, 0, sizeof(wcmd));
  958. wcmd.type = RSPAMD_CONTROL_LOG_PIPE;
  959. wcmd.cmd.log_pipe.type = cmd.cmd.log_pipe.type;
  960. rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
  961. rspamd_control_log_pipe_io_handler, NULL, 0);
  962. break;
  963. case RSPAMD_SRV_ON_FORK:
  964. rdata->rep.reply.on_fork.status = 0;
  965. rspamd_control_handle_on_fork(&cmd, rspamd_main);
  966. break;
  967. case RSPAMD_SRV_HEARTBEAT:
  968. worker->hb.last_event = ev_time();
  969. rdata->rep.reply.heartbeat.status = 0;
  970. break;
  971. case RSPAMD_SRV_HEALTH:
  972. rspamd_fill_health_reply(rspamd_main, &rdata->rep);
  973. break;
  974. case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
  975. #ifdef WITH_HYPERSCAN
  976. rspamd_hyperscan_notice_known(cmd.cmd.hyperscan_cache_file.path);
  977. #endif
  978. rdata->rep.reply.hyperscan_cache_file.unused = 0;
  979. break;
  980. case RSPAMD_SRV_FUZZY_BLOCKED:
  981. /* Broadcast command to all workers */
  982. memset(&wcmd, 0, sizeof(wcmd));
  983. wcmd.type = RSPAMD_CONTROL_FUZZY_BLOCKED;
  984. /* Ensure that memcpy is safe */
  985. G_STATIC_ASSERT(sizeof(wcmd.cmd.fuzzy_blocked) == sizeof(cmd.cmd.fuzzy_blocked));
  986. memcpy(&wcmd.cmd.fuzzy_blocked, &cmd.cmd.fuzzy_blocked, sizeof(wcmd.cmd.fuzzy_blocked));
  987. rspamd_control_broadcast_cmd(rspamd_main, &wcmd, rfd,
  988. rspamd_control_ignore_io_handler, NULL, worker->pid);
  989. break;
  990. default:
  991. msg_err_main("unknown command type: %d", cmd.type);
  992. break;
  993. }
  994. if (rfd != -1) {
  995. /* Close our copy to avoid descriptors leak */
  996. close(rfd);
  997. }
  998. /* Now plan write event and send data back */
  999. w->data = rdata;
  1000. ev_io_stop(EV_A_ w);
  1001. ev_io_set(w, worker->srv_pipe[0], EV_WRITE);
  1002. ev_io_start(EV_A_ w);
  1003. }
  1004. }
  1005. else if (revents == EV_WRITE) {
  1006. rdata = (struct rspamd_srv_reply_data *) w->data;
  1007. worker = rdata->worker;
  1008. worker->tmp_data = NULL; /* Avoid race */
  1009. rspamd_main = rdata->srv;
  1010. memset(&msg, 0, sizeof(msg));
  1011. /* Attach fd to the message */
  1012. if (rdata->fd != -1) {
  1013. memset(fdspace, 0, sizeof(fdspace));
  1014. msg.msg_control = fdspace;
  1015. msg.msg_controllen = sizeof(fdspace);
  1016. cmsg = CMSG_FIRSTHDR(&msg);
  1017. cmsg->cmsg_level = SOL_SOCKET;
  1018. cmsg->cmsg_type = SCM_RIGHTS;
  1019. cmsg->cmsg_len = CMSG_LEN(sizeof(int));
  1020. memcpy(CMSG_DATA(cmsg), &rdata->fd, sizeof(int));
  1021. }
  1022. iov.iov_base = &rdata->rep;
  1023. iov.iov_len = sizeof(rdata->rep);
  1024. msg.msg_iov = &iov;
  1025. msg.msg_iovlen = 1;
  1026. r = sendmsg(w->fd, &msg, 0);
  1027. if (r == -1) {
  1028. msg_err_main("cannot write to worker's srv pipe when writing reply: %s; command = %s",
  1029. strerror(errno), rspamd_srv_command_to_string(rdata->rep.type));
  1030. }
  1031. else if (r != sizeof(rdata->rep)) {
  1032. msg_err_main("cannot write to worker's srv pipe: %d != %d; command = %s",
  1033. (int) r, (int) sizeof(rdata->rep),
  1034. rspamd_srv_command_to_string(rdata->rep.type));
  1035. }
  1036. g_free(rdata);
  1037. w->data = worker;
  1038. ev_io_stop(EV_A_ w);
  1039. ev_io_set(w, worker->srv_pipe[0], EV_READ);
  1040. ev_io_start(EV_A_ w);
  1041. }
  1042. }
  1043. void rspamd_srv_start_watching(struct rspamd_main *srv,
  1044. struct rspamd_worker *worker,
  1045. struct ev_loop *ev_base)
  1046. {
  1047. g_assert(worker != NULL);
  1048. worker->tmp_data = NULL;
  1049. worker->srv_ev.data = worker;
  1050. ev_io_init(&worker->srv_ev, rspamd_srv_handler, worker->srv_pipe[0], EV_READ);
  1051. ev_io_start(ev_base, &worker->srv_ev);
  1052. }
  1053. struct rspamd_srv_request_data {
  1054. struct rspamd_worker *worker;
  1055. struct rspamd_srv_command cmd;
  1056. int attached_fd;
  1057. struct rspamd_srv_reply rep;
  1058. rspamd_srv_reply_handler handler;
  1059. ev_io io_ev;
  1060. gpointer ud;
  1061. };
  1062. static void
  1063. rspamd_srv_request_handler(EV_P_ ev_io *w, int revents)
  1064. {
  1065. struct rspamd_srv_request_data *rd = (struct rspamd_srv_request_data *) w->data;
  1066. struct msghdr msg;
  1067. struct iovec iov;
  1068. unsigned char fdspace[CMSG_SPACE(sizeof(int))];
  1069. struct cmsghdr *cmsg;
  1070. gssize r;
  1071. int rfd = -1;
  1072. if (revents == EV_WRITE) {
  1073. /* Send request to server */
  1074. memset(&msg, 0, sizeof(msg));
  1075. /* Attach fd to the message */
  1076. if (rd->attached_fd != -1) {
  1077. memset(fdspace, 0, sizeof(fdspace));
  1078. msg.msg_control = fdspace;
  1079. msg.msg_controllen = sizeof(fdspace);
  1080. cmsg = CMSG_FIRSTHDR(&msg);
  1081. cmsg->cmsg_level = SOL_SOCKET;
  1082. cmsg->cmsg_type = SCM_RIGHTS;
  1083. cmsg->cmsg_len = CMSG_LEN(sizeof(int));
  1084. memcpy(CMSG_DATA(cmsg), &rd->attached_fd, sizeof(int));
  1085. }
  1086. iov.iov_base = &rd->cmd;
  1087. iov.iov_len = sizeof(rd->cmd);
  1088. msg.msg_iov = &iov;
  1089. msg.msg_iovlen = 1;
  1090. r = sendmsg(w->fd, &msg, 0);
  1091. if (r == -1) {
  1092. if (r == ENOBUFS) {
  1093. /* On BSD derived systems we can have this error when trying to send
  1094. * requests too fast.
  1095. * It might be good to retry...
  1096. */
  1097. msg_info("cannot write to server pipe: %s; command = %s; retrying sending",
  1098. strerror(errno),
  1099. rspamd_srv_command_to_string(rd->cmd.type));
  1100. return;
  1101. }
  1102. msg_err("cannot write to server pipe: %s; command = %s", strerror(errno),
  1103. rspamd_srv_command_to_string(rd->cmd.type));
  1104. goto cleanup;
  1105. }
  1106. else if (r != sizeof(rd->cmd)) {
  1107. msg_err("incomplete write to the server pipe: %d != %d, command = %s",
  1108. (int) r, (int) sizeof(rd->cmd), rspamd_srv_command_to_string(rd->cmd.type));
  1109. goto cleanup;
  1110. }
  1111. ev_io_stop(EV_A_ w);
  1112. ev_io_set(w, rd->worker->srv_pipe[1], EV_READ);
  1113. ev_io_start(EV_A_ w);
  1114. }
  1115. else {
  1116. iov.iov_base = &rd->rep;
  1117. iov.iov_len = sizeof(rd->rep);
  1118. memset(&msg, 0, sizeof(msg));
  1119. msg.msg_control = fdspace;
  1120. msg.msg_controllen = sizeof(fdspace);
  1121. msg.msg_iov = &iov;
  1122. msg.msg_iovlen = 1;
  1123. r = recvmsg(w->fd, &msg, 0);
  1124. if (r == -1) {
  1125. msg_err("cannot read from server pipe: %s; command = %s", strerror(errno),
  1126. rspamd_srv_command_to_string(rd->cmd.type));
  1127. goto cleanup;
  1128. }
  1129. if (r != (int) sizeof(rd->rep)) {
  1130. msg_err("cannot read from server pipe, invalid length: %d != %d; command = %s",
  1131. (int) r, (int) sizeof(rd->rep), rspamd_srv_command_to_string(rd->cmd.type));
  1132. goto cleanup;
  1133. }
  1134. if (msg.msg_controllen >= CMSG_LEN(sizeof(int))) {
  1135. rfd = *(int *) CMSG_DATA(CMSG_FIRSTHDR(&msg));
  1136. }
  1137. /* Reply has been received */
  1138. if (rd->handler) {
  1139. rd->handler(rd->worker, &rd->rep, rfd, rd->ud);
  1140. }
  1141. goto cleanup;
  1142. }
  1143. return;
  1144. cleanup:
  1145. ev_io_stop(EV_A_ w);
  1146. g_free(rd);
  1147. }
  1148. void rspamd_srv_send_command(struct rspamd_worker *worker,
  1149. struct ev_loop *ev_base,
  1150. struct rspamd_srv_command *cmd,
  1151. int attached_fd,
  1152. rspamd_srv_reply_handler handler,
  1153. gpointer ud)
  1154. {
  1155. struct rspamd_srv_request_data *rd;
  1156. g_assert(cmd != NULL);
  1157. g_assert(worker != NULL);
  1158. rd = g_malloc0(sizeof(*rd));
  1159. cmd->id = ottery_rand_uint64();
  1160. memcpy(&rd->cmd, cmd, sizeof(rd->cmd));
  1161. rd->handler = handler;
  1162. rd->ud = ud;
  1163. rd->worker = worker;
  1164. rd->rep.id = cmd->id;
  1165. rd->rep.type = cmd->type;
  1166. rd->attached_fd = attached_fd;
  1167. rd->io_ev.data = rd;
  1168. ev_io_init(&rd->io_ev, rspamd_srv_request_handler,
  1169. rd->worker->srv_pipe[1], EV_WRITE);
  1170. ev_io_start(ev_base, &rd->io_ev);
  1171. }
  1172. enum rspamd_control_type
  1173. rspamd_control_command_from_string(const char *str)
  1174. {
  1175. enum rspamd_control_type ret = RSPAMD_CONTROL_MAX;
  1176. if (!str) {
  1177. return ret;
  1178. }
  1179. if (g_ascii_strcasecmp(str, "hyperscan_loaded") == 0) {
  1180. ret = RSPAMD_CONTROL_HYPERSCAN_LOADED;
  1181. }
  1182. else if (g_ascii_strcasecmp(str, "stat") == 0) {
  1183. ret = RSPAMD_CONTROL_STAT;
  1184. }
  1185. else if (g_ascii_strcasecmp(str, "reload") == 0) {
  1186. ret = RSPAMD_CONTROL_RELOAD;
  1187. }
  1188. else if (g_ascii_strcasecmp(str, "reresolve") == 0) {
  1189. ret = RSPAMD_CONTROL_RERESOLVE;
  1190. }
  1191. else if (g_ascii_strcasecmp(str, "recompile") == 0) {
  1192. ret = RSPAMD_CONTROL_RECOMPILE;
  1193. }
  1194. else if (g_ascii_strcasecmp(str, "log_pipe") == 0) {
  1195. ret = RSPAMD_CONTROL_LOG_PIPE;
  1196. }
  1197. else if (g_ascii_strcasecmp(str, "fuzzy_stat") == 0) {
  1198. ret = RSPAMD_CONTROL_FUZZY_STAT;
  1199. }
  1200. else if (g_ascii_strcasecmp(str, "fuzzy_sync") == 0) {
  1201. ret = RSPAMD_CONTROL_FUZZY_SYNC;
  1202. }
  1203. else if (g_ascii_strcasecmp(str, "monitored_change") == 0) {
  1204. ret = RSPAMD_CONTROL_MONITORED_CHANGE;
  1205. }
  1206. else if (g_ascii_strcasecmp(str, "child_change") == 0) {
  1207. ret = RSPAMD_CONTROL_CHILD_CHANGE;
  1208. }
  1209. return ret;
  1210. }
  1211. const char *
  1212. rspamd_control_command_to_string(enum rspamd_control_type cmd)
  1213. {
  1214. const char *reply = "unknown";
  1215. switch (cmd) {
  1216. case RSPAMD_CONTROL_STAT:
  1217. reply = "stat";
  1218. break;
  1219. case RSPAMD_CONTROL_RELOAD:
  1220. reply = "reload";
  1221. break;
  1222. case RSPAMD_CONTROL_RERESOLVE:
  1223. reply = "reresolve";
  1224. break;
  1225. case RSPAMD_CONTROL_RECOMPILE:
  1226. reply = "recompile";
  1227. break;
  1228. case RSPAMD_CONTROL_HYPERSCAN_LOADED:
  1229. reply = "hyperscan_loaded";
  1230. break;
  1231. case RSPAMD_CONTROL_LOG_PIPE:
  1232. reply = "log_pipe";
  1233. break;
  1234. case RSPAMD_CONTROL_FUZZY_STAT:
  1235. reply = "fuzzy_stat";
  1236. break;
  1237. case RSPAMD_CONTROL_FUZZY_SYNC:
  1238. reply = "fuzzy_sync";
  1239. break;
  1240. case RSPAMD_CONTROL_MONITORED_CHANGE:
  1241. reply = "monitored_change";
  1242. break;
  1243. case RSPAMD_CONTROL_CHILD_CHANGE:
  1244. reply = "child_change";
  1245. break;
  1246. default:
  1247. break;
  1248. }
  1249. return reply;
  1250. }
  1251. const char *rspamd_srv_command_to_string(enum rspamd_srv_type cmd)
  1252. {
  1253. const char *reply = "unknown";
  1254. switch (cmd) {
  1255. case RSPAMD_SRV_SOCKETPAIR:
  1256. reply = "socketpair";
  1257. break;
  1258. case RSPAMD_SRV_HYPERSCAN_LOADED:
  1259. reply = "hyperscan_loaded";
  1260. break;
  1261. case RSPAMD_SRV_MONITORED_CHANGE:
  1262. reply = "monitored_change";
  1263. break;
  1264. case RSPAMD_SRV_LOG_PIPE:
  1265. reply = "log_pipe";
  1266. break;
  1267. case RSPAMD_SRV_ON_FORK:
  1268. reply = "on_fork";
  1269. break;
  1270. case RSPAMD_SRV_HEARTBEAT:
  1271. reply = "heartbeat";
  1272. break;
  1273. case RSPAMD_SRV_HEALTH:
  1274. reply = "health";
  1275. break;
  1276. case RSPAMD_SRV_NOTICE_HYPERSCAN_CACHE:
  1277. reply = "notice_hyperscan_cache";
  1278. break;
  1279. case RSPAMD_SRV_FUZZY_BLOCKED:
  1280. reply = "fuzzy_blocked";
  1281. break;
  1282. }
  1283. return reply;
  1284. }