You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

controller.c 42KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335
  1. /*
  2. * Copyright (c) 2009, Rambler media
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. *
  13. * THIS SOFTWARE IS PROVIDED BY Rambler media ''AS IS'' AND ANY
  14. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  15. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  16. * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
  17. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  18. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  19. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  20. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  21. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  22. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  23. */
  24. #include "config.h"
  25. #include "util.h"
  26. #include "main.h"
  27. #include "message.h"
  28. #include "protocol.h"
  29. #include "upstream.h"
  30. #include "cfg_file.h"
  31. #include "cfg_xml.h"
  32. #include "map.h"
  33. #include "dns.h"
  34. #include "tokenizers/tokenizers.h"
  35. #include "classifiers/classifiers.h"
  36. #include "binlog.h"
  37. #include "statfile_sync.h"
  38. #include "lua/lua_common.h"
  39. #define END "END" CRLF
  40. /* 120 seconds for controller's IO */
  41. #define CONTROLLER_IO_TIMEOUT 120
  42. /* Init functions */
  43. gpointer init_controller ();
  44. void start_controller (struct rspamd_worker *worker);
  45. worker_t controller_worker = {
  46. "controller", /* Name */
  47. init_controller, /* Init function */
  48. start_controller, /* Start function */
  49. TRUE, /* Has socket */
  50. FALSE, /* Non unique */
  51. FALSE, /* Non threaded */
  52. TRUE /* Killable */
  53. };
  54. enum command_type {
  55. COMMAND_PASSWORD,
  56. COMMAND_QUIT,
  57. COMMAND_RELOAD,
  58. COMMAND_STAT,
  59. COMMAND_SHUTDOWN,
  60. COMMAND_UPTIME,
  61. COMMAND_LEARN,
  62. COMMAND_LEARN_SPAM,
  63. COMMAND_LEARN_HAM,
  64. COMMAND_HELP,
  65. COMMAND_COUNTERS,
  66. COMMAND_SYNC,
  67. COMMAND_WEIGHTS
  68. };
  69. struct controller_command {
  70. gchar *command;
  71. gboolean privilleged;
  72. enum command_type type;
  73. };
  74. struct custom_controller_command {
  75. const gchar *command;
  76. gboolean privilleged;
  77. gboolean require_message;
  78. controller_func_t handler;
  79. };
  80. struct rspamd_controller_ctx {
  81. char *password;
  82. guint32 timeout;
  83. struct rspamd_dns_resolver *resolver;
  84. struct event_base *ev_base;
  85. };
  86. static struct controller_command commands[] = {
  87. {"password", FALSE, COMMAND_PASSWORD},
  88. {"quit", FALSE, COMMAND_QUIT},
  89. {"reload", TRUE, COMMAND_RELOAD},
  90. {"stat", FALSE, COMMAND_STAT},
  91. {"shutdown", TRUE, COMMAND_SHUTDOWN},
  92. {"uptime", FALSE, COMMAND_UPTIME},
  93. {"learn", TRUE, COMMAND_LEARN},
  94. {"weights", FALSE, COMMAND_WEIGHTS},
  95. {"help", FALSE, COMMAND_HELP},
  96. {"counters", FALSE, COMMAND_COUNTERS},
  97. {"sync", FALSE, COMMAND_SYNC},
  98. {"learn_spam", TRUE, COMMAND_LEARN_SPAM},
  99. {"learn_ham", TRUE, COMMAND_LEARN_HAM}
  100. };
  101. static GList *custom_commands = NULL;
  102. static time_t start_time;
  103. static gchar greetingbuf[1024];
  104. static sig_atomic_t wanna_die = 0;
  105. extern rspamd_hash_t *counters;
  106. static gboolean controller_write_socket (void *arg);
  107. #ifndef HAVE_SA_SIGINFO
  108. static void
  109. sig_handler (gint signo)
  110. #else
  111. static void
  112. sig_handler (gint signo, siginfo_t *info, void *unused)
  113. #endif
  114. {
  115. struct timeval tv;
  116. switch (signo) {
  117. case SIGINT:
  118. case SIGTERM:
  119. if (!wanna_die) {
  120. wanna_die = 1;
  121. tv.tv_sec = 0;
  122. tv.tv_usec = 0;
  123. event_loopexit (&tv);
  124. #ifdef WITH_GPERF_TOOLS
  125. ProfilerStop ();
  126. #endif
  127. }
  128. break;
  129. }
  130. }
  131. static void
  132. sigusr2_handler (gint fd, short what, void *arg)
  133. {
  134. struct rspamd_worker *worker = (struct rspamd_worker *)arg;
  135. /* Do not accept new connections, preparing to end worker's process */
  136. struct timeval tv;
  137. tv.tv_sec = 2;
  138. tv.tv_usec = 0;
  139. event_del (&worker->sig_ev_usr1);
  140. event_del (&worker->sig_ev_usr2);
  141. event_del (&worker->bind_ev);
  142. msg_info ("controller's shutdown is pending in %d sec", 2);
  143. event_loopexit (&tv);
  144. return;
  145. }
  146. /*
  147. * Reopen log is designed by sending sigusr1 to active workers and pending shutdown of them
  148. */
  149. static void
  150. sigusr1_handler (gint fd, short what, void *arg)
  151. {
  152. struct rspamd_worker *worker = (struct rspamd_worker *) arg;
  153. reopen_log (worker->srv->logger);
  154. return;
  155. }
  156. static void
  157. free_session (void *ud)
  158. {
  159. GList *part;
  160. struct mime_part *p;
  161. struct controller_session *session = ud;
  162. msg_debug ("freeing session %p", session);
  163. while ((part = g_list_first (session->parts))) {
  164. session->parts = g_list_remove_link (session->parts, part);
  165. p = (struct mime_part *)part->data;
  166. g_byte_array_free (p->content, FALSE);
  167. g_list_free_1 (part);
  168. }
  169. rspamd_remove_dispatcher (session->dispatcher);
  170. close (session->sock);
  171. memory_pool_delete (session->session_pool);
  172. g_free (session);
  173. }
  174. static gint
  175. check_auth (struct controller_command *cmd, struct controller_session *session)
  176. {
  177. gchar out_buf[128];
  178. gint r;
  179. if (cmd->privilleged && !session->authorized) {
  180. r = rspamd_snprintf (out_buf, sizeof (out_buf), "not authorized" CRLF);
  181. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  182. return 0;
  183. }
  184. return 0;
  185. }
  186. return 1;
  187. }
  188. static void
  189. counter_write_callback (gpointer key, gpointer value, void *data)
  190. {
  191. struct controller_session *session = data;
  192. struct counter_data *cd = value;
  193. gchar *name = key;
  194. gchar out_buf[128];
  195. gint r;
  196. r = rspamd_snprintf (out_buf, sizeof (out_buf), "%s: %uD" CRLF, name, (guint32)cd->value);
  197. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, TRUE, FALSE)) {
  198. msg_warn ("cannot write to socket");
  199. }
  200. }
  201. static gboolean
  202. write_whole_statfile (struct controller_session *session, gchar *symbol, struct classifier_config *ccf)
  203. {
  204. stat_file_t *statfile;
  205. struct statfile *st;
  206. gchar out_buf[BUFSIZ];
  207. guint i;
  208. guint64 rev, ti, len, pos, blocks;
  209. gchar *out;
  210. struct rspamd_binlog_element log_elt;
  211. struct stat_file_block *stat_elt;
  212. statfile = get_statfile_by_symbol (session->worker->srv->statfile_pool, ccf,
  213. symbol, &st, FALSE);
  214. if (statfile == NULL) {
  215. return FALSE;
  216. }
  217. /* Begin to copy all blocks into array */
  218. statfile_get_revision (statfile, &rev, (time_t *)&ti);
  219. if (ti == 0) {
  220. /* Not tracked file */
  221. ti = time (NULL);
  222. statfile_set_revision (statfile, rev, ti);
  223. }
  224. msg_info ("send a whole statfile %s with version %uL to slave", symbol, rev);
  225. blocks = statfile_get_total_blocks (statfile);
  226. len = blocks * sizeof (struct rspamd_binlog_element);
  227. out = memory_pool_alloc (session->session_pool, len);
  228. for (i = 0, pos = 0; i < blocks; i ++) {
  229. stat_elt = (struct stat_file_block *)((u_char *)statfile->map + statfile->seek_pos + i * sizeof (struct stat_file_block));
  230. if (fabs (stat_elt->value) > 0.001) {
  231. /* Write only those values which value is not 0 */
  232. log_elt.h1 = stat_elt->hash1;
  233. log_elt.h2 = stat_elt->hash2;
  234. log_elt.value = stat_elt->value;
  235. memcpy (out + pos, &log_elt, sizeof (log_elt));
  236. pos += sizeof (struct rspamd_binlog_element);
  237. }
  238. }
  239. i = rspamd_snprintf (out_buf, sizeof (out_buf), "%uL %uL %uL" CRLF, rev, ti, pos);
  240. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, i, TRUE, FALSE)) {
  241. return FALSE;
  242. }
  243. if (!rspamd_dispatcher_write (session->dispatcher, out, pos, TRUE, TRUE)) {
  244. return FALSE;
  245. }
  246. return TRUE;
  247. }
  248. static gboolean
  249. process_sync_command (struct controller_session *session, gchar **args)
  250. {
  251. gchar out_buf[BUFSIZ], *arg, *err_str, *symbol;
  252. gint r;
  253. guint64 rev, time;
  254. struct statfile *st = NULL;
  255. struct classifier_config *ccf;
  256. GList *cur;
  257. struct rspamd_binlog *binlog;
  258. GByteArray *data = NULL;
  259. arg = *args;
  260. if (!arg || *arg == '\0') {
  261. msg_info ("bad arguments to sync command, need symbol");
  262. return FALSE;
  263. }
  264. symbol = arg;
  265. arg = *(args + 1);
  266. if (!arg || *arg == '\0') {
  267. msg_info ("bad arguments to sync command, need revision");
  268. return FALSE;
  269. }
  270. rev = strtoull (arg, &err_str, 10);
  271. if (err_str && *err_str != 0) {
  272. msg_info ("bad arguments to sync command: %s", arg);
  273. return FALSE;
  274. }
  275. arg = *(args + 2);
  276. if (!arg || *arg == '\0') {
  277. msg_info ("bad arguments to sync command, need time");
  278. return FALSE;
  279. }
  280. time = strtoull (arg, &err_str, 10);
  281. if (err_str && *err_str != 0) {
  282. msg_info ("bad arguments to sync command: %s", arg);
  283. return FALSE;
  284. }
  285. ccf = g_hash_table_lookup (session->cfg->classifiers_symbols, symbol);
  286. if (ccf == NULL) {
  287. msg_info ("bad symbol: %s", symbol);
  288. return FALSE;
  289. }
  290. cur = g_list_first (ccf->statfiles);
  291. while (cur) {
  292. st = cur->data;
  293. if (strcmp (symbol, st->symbol) == 0) {
  294. break;
  295. }
  296. st = NULL;
  297. cur = g_list_next (cur);
  298. }
  299. if (st == NULL) {
  300. msg_info ("bad symbol: %s", symbol);
  301. return FALSE;
  302. }
  303. binlog = get_binlog_by_statfile (st);
  304. if (binlog == NULL) {
  305. msg_info ("cannot open binlog: %s", symbol);
  306. return FALSE;
  307. }
  308. while (binlog_sync (binlog, rev, &time, &data)) {
  309. rev ++;
  310. r = rspamd_snprintf (out_buf, sizeof (out_buf), "%uL %uL %z" CRLF, rev, time, data->len);
  311. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  312. if (data != NULL) {
  313. g_free (data);
  314. }
  315. return FALSE;
  316. }
  317. if (data->data != NULL) {
  318. if (!rspamd_dispatcher_write (session->dispatcher, data->data, data->len, TRUE, FALSE)) {
  319. if (data != NULL) {
  320. g_free (data);
  321. }
  322. return FALSE;
  323. }
  324. }
  325. }
  326. if (time == 0) {
  327. if (data != NULL) {
  328. g_free (data);
  329. }
  330. return write_whole_statfile (session, symbol, ccf);
  331. }
  332. if (data != NULL) {
  333. g_free (data);
  334. }
  335. return TRUE;
  336. }
  337. static gboolean
  338. process_stat_command (struct controller_session *session)
  339. {
  340. gchar out_buf[BUFSIZ];
  341. gint r;
  342. guint64 used, total, rev;
  343. time_t ti;
  344. memory_pool_stat_t mem_st;
  345. struct classifier_config *ccf;
  346. stat_file_t *statfile;
  347. struct statfile *st;
  348. GList *cur_cl, *cur_st;
  349. memory_pool_stat (&mem_st);
  350. r = rspamd_snprintf (out_buf, sizeof (out_buf), "Messages scanned: %ud" CRLF, session->worker->srv->stat->messages_scanned);
  351. if (session->worker->srv->stat->messages_scanned > 0) {
  352. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Messages treated as spam: %ud, %.2f%%" CRLF, session->worker->srv->stat->messages_spam,
  353. (double)session->worker->srv->stat->messages_spam / (double)session->worker->srv->stat->messages_scanned * 100.);
  354. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Messages treated as ham: %ud, %.2f%%" CRLF, session->worker->srv->stat->messages_ham,
  355. (double)session->worker->srv->stat->messages_ham / (double)session->worker->srv->stat->messages_scanned * 100.);
  356. }
  357. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Messages learned: %ud" CRLF, session->worker->srv->stat->messages_learned);
  358. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Connections count: %ud" CRLF, session->worker->srv->stat->connections_count);
  359. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Control connections count: %ud" CRLF, session->worker->srv->stat->control_connections_count);
  360. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Pools allocated: %z" CRLF, mem_st.pools_allocated);
  361. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Pools freed: %z" CRLF, mem_st.pools_freed);
  362. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Bytes allocated: %z" CRLF, mem_st.bytes_allocated);
  363. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Memory chunks allocated: %z" CRLF, mem_st.chunks_allocated);
  364. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Shared chunks allocated: %z" CRLF, mem_st.shared_chunks_allocated);
  365. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Chunks freed: %z" CRLF, mem_st.chunks_freed);
  366. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Oversized chunks: %z" CRLF, mem_st.oversized_chunks);
  367. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Fuzzy hashes stored: %ud" CRLF, session->worker->srv->stat->fuzzy_hashes);
  368. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r, "Fuzzy hashes expired: %ud" CRLF, session->worker->srv->stat->fuzzy_hashes_expired);
  369. /* Now write statistics for each statfile */
  370. cur_cl = g_list_first (session->cfg->classifiers);
  371. while (cur_cl) {
  372. ccf = cur_cl->data;
  373. cur_st = g_list_first (ccf->statfiles);
  374. while (cur_st) {
  375. st = cur_st->data;
  376. if ((statfile = statfile_pool_is_open (session->worker->srv->statfile_pool, st->path)) == NULL) {
  377. statfile = statfile_pool_open (session->worker->srv->statfile_pool, st->path, st->size, FALSE);
  378. }
  379. if (statfile) {
  380. used = statfile_get_used_blocks (statfile);
  381. total = statfile_get_total_blocks (statfile);
  382. statfile_get_revision (statfile, &rev, &ti);
  383. if (total != (guint64)-1 && used != (guint64)-1) {
  384. r += rspamd_snprintf (out_buf + r, sizeof (out_buf) - r,
  385. "Statfile: %s (version %uL); length: %Hz; free blocks: %uL; total blocks: %uL; free: %.2f%%" CRLF,
  386. st->symbol, rev, st->size,
  387. (total - used), total,
  388. (double)((double)(total - used) / (double)total) * 100.);
  389. }
  390. }
  391. cur_st = g_list_next (cur_st);
  392. }
  393. cur_cl = g_list_next (cur_cl);
  394. }
  395. return rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE);
  396. }
  397. static gboolean
  398. process_command (struct controller_command *cmd, gchar **cmd_args, struct controller_session *session)
  399. {
  400. gchar out_buf[BUFSIZ], *arg, *err_str;
  401. gint r = 0, days, hours, minutes;
  402. time_t uptime;
  403. guint32 size = 0;
  404. struct classifier_config *cl;
  405. struct rspamd_controller_ctx *ctx = session->worker->ctx;
  406. switch (cmd->type) {
  407. case COMMAND_PASSWORD:
  408. arg = *cmd_args;
  409. if (!arg || *arg == '\0') {
  410. msg_debug ("empty password passed");
  411. r = rspamd_snprintf (out_buf, sizeof (out_buf), "password command requires one argument" CRLF);
  412. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  413. return FALSE;
  414. }
  415. return TRUE;
  416. }
  417. if (ctx->password == NULL) {
  418. r = rspamd_snprintf (out_buf, sizeof (out_buf), "password command disabled in config, authorized access unallowed" CRLF);
  419. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  420. return FALSE;
  421. }
  422. return TRUE;
  423. }
  424. if (strncmp (arg, ctx->password, strlen (arg)) == 0) {
  425. session->authorized = 1;
  426. r = rspamd_snprintf (out_buf, sizeof (out_buf), "password accepted" CRLF);
  427. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  428. return FALSE;
  429. }
  430. }
  431. else {
  432. session->authorized = 0;
  433. r = rspamd_snprintf (out_buf, sizeof (out_buf), "password NOT accepted" CRLF);
  434. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  435. return FALSE;
  436. }
  437. }
  438. break;
  439. case COMMAND_QUIT:
  440. session->state = STATE_QUIT;
  441. break;
  442. case COMMAND_RELOAD:
  443. if (check_auth (cmd, session)) {
  444. r = rspamd_snprintf (out_buf, sizeof (out_buf), "reload request sent" CRLF);
  445. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  446. return FALSE;
  447. }
  448. kill (getppid (), SIGHUP);
  449. }
  450. break;
  451. case COMMAND_STAT:
  452. if (check_auth (cmd, session)) {
  453. return process_stat_command (session);
  454. }
  455. break;
  456. case COMMAND_SHUTDOWN:
  457. if (check_auth (cmd, session)) {
  458. r = rspamd_snprintf (out_buf, sizeof (out_buf), "shutdown request sent" CRLF);
  459. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  460. return FALSE;
  461. }
  462. kill (getppid (), SIGTERM);
  463. }
  464. break;
  465. case COMMAND_UPTIME:
  466. if (check_auth (cmd, session)) {
  467. uptime = time (NULL) - start_time;
  468. /* If uptime more than 2 hours, print as a number of days. */
  469. if (uptime >= 2 * 3600) {
  470. days = uptime / 86400;
  471. hours = uptime / 3600 - days * 24;
  472. minutes = uptime / 60 - hours * 60 - days * 1440;
  473. r = rspamd_snprintf (out_buf, sizeof (out_buf), "%d day%s %d hour%s %d minute%s" CRLF, days, days > 1 ? "s" : " ", hours, hours > 1 ? "s" : " ", minutes, minutes > 1 ? "s" : " ");
  474. }
  475. /* If uptime is less than 1 minute print only seconds */
  476. else if (uptime / 60 == 0) {
  477. r = rspamd_snprintf (out_buf, sizeof (out_buf), "%d second%s" CRLF, (gint)uptime, (gint)uptime > 1 ? "s" : " ");
  478. }
  479. /* Else print the minutes and seconds. */
  480. else {
  481. hours = uptime / 3600;
  482. minutes = uptime / 60 - hours * 60;
  483. uptime -= hours * 3600 + minutes * 60;
  484. r = rspamd_snprintf (out_buf, sizeof (out_buf), "%d hour%s %d minute%s %d second%s" CRLF, hours, hours > 1 ? "s" : " ", minutes, minutes > 1 ? "s" : " ", (gint)uptime, uptime > 1 ? "s" : " ");
  485. }
  486. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  487. return FALSE;
  488. }
  489. }
  490. break;
  491. case COMMAND_LEARN_SPAM:
  492. if (check_auth (cmd, session)) {
  493. arg = *cmd_args;
  494. if (!arg || *arg == '\0') {
  495. msg_debug ("no statfile specified in learn command");
  496. r = rspamd_snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
  497. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  498. return FALSE;
  499. }
  500. return TRUE;
  501. }
  502. arg = *(cmd_args + 1);
  503. if (arg == NULL || *arg == '\0') {
  504. msg_debug ("no message size specified in learn command");
  505. r = rspamd_snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: symbol and message size" CRLF);
  506. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  507. return FALSE;
  508. }
  509. return TRUE;
  510. }
  511. size = strtoul (arg, &err_str, 10);
  512. if (err_str && *err_str != '\0') {
  513. msg_debug ("message size is invalid: %s", arg);
  514. r = rspamd_snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
  515. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  516. return FALSE;
  517. }
  518. return TRUE;
  519. }
  520. cl = find_classifier_conf (session->cfg, *cmd_args);
  521. if (cl == NULL) {
  522. r = rspamd_snprintf (out_buf, sizeof (out_buf), "classifier %s is not defined" CRLF, *cmd_args);
  523. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  524. return FALSE;
  525. }
  526. return TRUE;
  527. }
  528. session->learn_classifier = cl;
  529. /* By default learn positive */
  530. session->in_class = TRUE;
  531. rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
  532. session->state = STATE_LEARN_SPAM_PRE;
  533. }
  534. break;
  535. case COMMAND_LEARN_HAM:
  536. if (check_auth (cmd, session)) {
  537. arg = *cmd_args;
  538. if (!arg || *arg == '\0') {
  539. msg_debug ("no statfile specified in learn command");
  540. r = rspamd_snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
  541. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  542. return FALSE;
  543. }
  544. return TRUE;
  545. }
  546. arg = *(cmd_args + 1);
  547. if (arg == NULL || *arg == '\0') {
  548. msg_debug ("no message size specified in learn command");
  549. r = rspamd_snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: symbol and message size" CRLF);
  550. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  551. return FALSE;
  552. }
  553. return TRUE;
  554. }
  555. size = strtoul (arg, &err_str, 10);
  556. if (err_str && *err_str != '\0') {
  557. msg_debug ("message size is invalid: %s", arg);
  558. r = rspamd_snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
  559. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  560. return FALSE;
  561. }
  562. return TRUE;
  563. }
  564. cl = find_classifier_conf (session->cfg, *cmd_args);
  565. if (cl == NULL) {
  566. r = rspamd_snprintf (out_buf, sizeof (out_buf), "classifier %s is not defined" CRLF, *cmd_args);
  567. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  568. return FALSE;
  569. }
  570. return TRUE;
  571. }
  572. session->learn_classifier = cl;
  573. /* By default learn positive */
  574. session->in_class = FALSE;
  575. rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
  576. session->state = STATE_LEARN_SPAM_PRE;
  577. }
  578. break;
  579. case COMMAND_LEARN:
  580. if (check_auth (cmd, session)) {
  581. arg = *cmd_args;
  582. if (!arg || *arg == '\0') {
  583. msg_debug ("no statfile specified in learn command");
  584. r = rspamd_snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: stat filename and its size" CRLF);
  585. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  586. return FALSE;
  587. }
  588. return TRUE;
  589. }
  590. arg = *(cmd_args + 1);
  591. if (arg == NULL || *arg == '\0') {
  592. msg_debug ("no message size specified in learn command");
  593. r = rspamd_snprintf (out_buf, sizeof (out_buf), "learn command requires at least two arguments: symbol and message size" CRLF);
  594. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  595. return FALSE;
  596. }
  597. return TRUE;
  598. }
  599. size = strtoul (arg, &err_str, 10);
  600. if (err_str && *err_str != '\0') {
  601. msg_debug ("message size is invalid: %s", arg);
  602. r = rspamd_snprintf (out_buf, sizeof (out_buf), "learn size is invalid" CRLF);
  603. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  604. return FALSE;
  605. }
  606. return TRUE;
  607. }
  608. session->learn_symbol = memory_pool_strdup (session->session_pool, *cmd_args);
  609. cl = g_hash_table_lookup (session->cfg->classifiers_symbols, *cmd_args);
  610. if (cl == NULL) {
  611. r = rspamd_snprintf (out_buf, sizeof (out_buf), "statfile %s is not defined" CRLF, *cmd_args);
  612. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  613. return FALSE;
  614. }
  615. return TRUE;
  616. }
  617. session->learn_classifier = cl;
  618. /* By default learn positive */
  619. session->in_class = 1;
  620. session->learn_multiplier = 1.;
  621. /* Get all arguments */
  622. while (*cmd_args++) {
  623. arg = *cmd_args;
  624. if (arg && *arg == '-') {
  625. switch (*(arg + 1)) {
  626. case 'r':
  627. arg = *(cmd_args + 1);
  628. if (!arg || *arg == '\0') {
  629. r = rspamd_snprintf (out_buf, sizeof (out_buf), "recipient is not defined" CRLF);
  630. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  631. return FALSE;
  632. }
  633. }
  634. session->learn_rcpt = memory_pool_strdup (session->session_pool, arg);
  635. break;
  636. case 'f':
  637. arg = *(cmd_args + 1);
  638. if (!arg || *arg == '\0') {
  639. r = rspamd_snprintf (out_buf, sizeof (out_buf), "from is not defined" CRLF);
  640. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  641. return FALSE;
  642. }
  643. }
  644. session->learn_from = memory_pool_strdup (session->session_pool, arg);
  645. break;
  646. case 'n':
  647. session->in_class = 0;
  648. break;
  649. case 'm':
  650. arg = *(cmd_args + 1);
  651. if (!arg || *arg == '\0') {
  652. r = rspamd_snprintf (out_buf, sizeof (out_buf), "multiplier is not defined" CRLF);
  653. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  654. return FALSE;
  655. }
  656. }
  657. else {
  658. session->learn_multiplier = strtod (arg, NULL);
  659. }
  660. break;
  661. default:
  662. r = rspamd_snprintf (out_buf, sizeof (out_buf), "tokenizer is not defined" CRLF);
  663. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  664. return FALSE;
  665. }
  666. return TRUE;
  667. }
  668. }
  669. }
  670. rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
  671. session->state = STATE_LEARN;
  672. }
  673. break;
  674. case COMMAND_WEIGHTS:
  675. arg = *cmd_args;
  676. if (!arg || *arg == '\0') {
  677. msg_debug ("no statfile specified in weights command");
  678. r = rspamd_snprintf (out_buf, sizeof (out_buf), "weights command requires two arguments: statfile and message size" CRLF);
  679. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  680. return FALSE;
  681. }
  682. return TRUE;
  683. }
  684. arg = *(cmd_args + 1);
  685. if (arg == NULL || *arg == '\0') {
  686. msg_debug ("no message size specified in weights command");
  687. r = rspamd_snprintf (out_buf, sizeof (out_buf), "weights command requires two arguments: statfile and message size" CRLF);
  688. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  689. return FALSE;
  690. }
  691. return TRUE;
  692. }
  693. size = strtoul (arg, &err_str, 10);
  694. if (err_str && *err_str != '\0') {
  695. msg_debug ("message size is invalid: %s", arg);
  696. r = rspamd_snprintf (out_buf, sizeof (out_buf), "message size is invalid" CRLF);
  697. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  698. return FALSE;
  699. }
  700. return TRUE;
  701. }
  702. cl = g_hash_table_lookup (session->cfg->classifiers_symbols, *cmd_args);
  703. if (cl == NULL) {
  704. r = rspamd_snprintf (out_buf, sizeof (out_buf), "statfile %s is not defined" CRLF, *cmd_args);
  705. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  706. return FALSE;
  707. }
  708. return TRUE;
  709. }
  710. session->learn_classifier = cl;
  711. rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_CHARACTER, size);
  712. session->state = STATE_WEIGHTS;
  713. break;
  714. case COMMAND_SYNC:
  715. if (!process_sync_command (session, cmd_args)) {
  716. r = rspamd_snprintf (out_buf, sizeof (out_buf), "FAIL" CRLF);
  717. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  718. return FALSE;
  719. }
  720. return TRUE;
  721. }
  722. break;
  723. case COMMAND_HELP:
  724. r = rspamd_snprintf (out_buf, sizeof (out_buf),
  725. "Rspamd CLI commands (* - privileged command):" CRLF
  726. " help - this help message" CRLF
  727. "(*) learn <statfile> <size> [-r recipient] [-m multiplier] [-f from] [-n] - learn message to specified statfile" CRLF
  728. " quit - quit CLI session" CRLF
  729. " password <password> - authenticate yourself for privileged commands" CRLF
  730. "(*) reload - reload rspamd" CRLF
  731. "(*) shutdown - shutdown rspamd" CRLF
  732. " stat - show different rspamd stat" CRLF
  733. " sync - run synchronization of statfiles" CRLF
  734. " counters - show rspamd counters" CRLF
  735. " uptime - rspamd uptime" CRLF
  736. " weights <statfile> <size> - weight of message in all statfiles" CRLF);
  737. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  738. return FALSE;
  739. }
  740. break;
  741. case COMMAND_COUNTERS:
  742. rspamd_hash_foreach (counters, counter_write_callback, session);
  743. break;
  744. }
  745. return TRUE;
  746. }
  747. static gboolean
  748. process_custom_command (gchar *line, gchar **cmd_args, struct controller_session *session)
  749. {
  750. GList *cur;
  751. struct custom_controller_command *cmd;
  752. cur = custom_commands;
  753. while (cur) {
  754. cmd = cur->data;
  755. if (g_ascii_strcasecmp (cmd->command, line) == 0) {
  756. /* Call handler */
  757. cmd->handler (cmd_args, session);
  758. return TRUE;
  759. }
  760. cur = g_list_next (cur);
  761. }
  762. return FALSE;
  763. }
  764. static struct controller_command *
  765. process_normal_command (const gchar *line)
  766. {
  767. guint i;
  768. struct controller_command *c;
  769. for (i = 0; i < G_N_ELEMENTS (commands); i ++) {
  770. c = &commands[i];
  771. if (g_ascii_strcasecmp (line, c->command) == 0) {
  772. return c;
  773. }
  774. }
  775. return NULL;
  776. }
  777. /*
  778. * Called if all filters are processed
  779. */
  780. static void
  781. fin_learn_task (void *arg)
  782. {
  783. struct worker_task *task = (struct worker_task *) arg;
  784. if (task->state != WRITING_REPLY) {
  785. task->state = WRITE_REPLY;
  786. /* Process all statfiles */
  787. process_statfiles (task);
  788. /* Call post filters */
  789. lua_call_post_filters (task);
  790. }
  791. /* Check if we have all events finished */
  792. if (task->state != WRITING_REPLY) {
  793. if (task->fin_callback) {
  794. task->fin_callback (task->fin_arg);
  795. }
  796. else {
  797. rspamd_dispatcher_restore (task->dispatcher);
  798. }
  799. }
  800. }
  801. /*
  802. * Called if session was restored inside fin callback
  803. */
  804. static void
  805. restore_learn_task (void *arg)
  806. {
  807. struct worker_task *task = (struct worker_task *) arg;
  808. /* Special state */
  809. task->state = WRITING_REPLY;
  810. rspamd_dispatcher_pause (task->dispatcher);
  811. }
  812. static gboolean
  813. controller_read_socket (f_str_t * in, void *arg)
  814. {
  815. struct controller_session *session = (struct controller_session *)arg;
  816. struct classifier_ctx *cls_ctx;
  817. gint len, i, r;
  818. gchar *s, **params, *cmd, out_buf[128];
  819. struct controller_command *command;
  820. struct worker_task *task;
  821. struct mime_text_part *part;
  822. GList *cur = NULL;
  823. GTree *tokens = NULL;
  824. GError *err = NULL;
  825. f_str_t c;
  826. switch (session->state) {
  827. case STATE_COMMAND:
  828. s = fstrcstr (in, session->session_pool);
  829. params = g_strsplit_set (s, " ", -1);
  830. memory_pool_add_destructor (session->session_pool, (pool_destruct_func) g_strfreev, params);
  831. len = g_strv_length (params);
  832. if (len > 0) {
  833. cmd = g_strstrip (params[0]);
  834. command = process_normal_command (cmd);
  835. if (command != NULL) {
  836. if (! process_command (command, &params[1], session)) {
  837. return FALSE;
  838. }
  839. }
  840. else {
  841. if (!process_custom_command (cmd, &params[1], session)) {
  842. msg_debug ("'%s'", cmd);
  843. i = rspamd_snprintf (out_buf, sizeof (out_buf), "Unknown command" CRLF);
  844. if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
  845. return FALSE;
  846. }
  847. }
  848. }
  849. }
  850. if (session->state == STATE_COMMAND) {
  851. session->state = STATE_REPLY;
  852. }
  853. if (session->state != STATE_LEARN && session->state != STATE_LEARN_SPAM_PRE
  854. && session->state != STATE_WEIGHTS && session->state != STATE_OTHER) {
  855. if (!rspamd_dispatcher_write (session->dispatcher, END, sizeof (END) - 1, FALSE, TRUE)) {
  856. return FALSE;
  857. }
  858. }
  859. break;
  860. case STATE_LEARN:
  861. session->learn_buf = in;
  862. task = construct_task (session->worker);
  863. task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
  864. task->msg->begin = in->begin;
  865. task->msg->len = in->len;
  866. task->ev_base = session->ev_base;
  867. r = process_message (task);
  868. if (r == -1) {
  869. msg_warn ("processing of message failed");
  870. free_task (task, FALSE);
  871. session->state = STATE_REPLY;
  872. r = rspamd_snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF);
  873. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  874. return FALSE;
  875. }
  876. return FALSE;
  877. }
  878. if (!learn_task (session->learn_symbol, task, &err)) {
  879. if (err) {
  880. i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn failed, learn classifier error: %s" CRLF END, err->message);
  881. g_error_free (err);
  882. }
  883. else {
  884. i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn failed, unknown learn classifier error" CRLF END);
  885. }
  886. free_task (task, FALSE);
  887. if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
  888. return FALSE;
  889. }
  890. session->state = STATE_REPLY;
  891. return TRUE;
  892. }
  893. free_task (task, FALSE);
  894. i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF END);
  895. session->state = STATE_REPLY;
  896. if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
  897. return FALSE;
  898. }
  899. break;
  900. case STATE_LEARN_SPAM_PRE:
  901. session->learn_buf = in;
  902. task = construct_task (session->worker);
  903. task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
  904. task->msg->begin = in->begin;
  905. task->msg->len = in->len;
  906. task->resolver = session->resolver;
  907. task->ev_base = session->ev_base;
  908. r = process_message (task);
  909. if (r == -1) {
  910. msg_warn ("processing of message failed");
  911. session->state = STATE_REPLY;
  912. r = rspamd_snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF);
  913. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  914. return FALSE;
  915. }
  916. return FALSE;
  917. }
  918. /* Set up async session */
  919. task->s = new_async_session (task->task_pool, fin_learn_task, restore_learn_task, free_task_hard, task);
  920. r = process_filters (task);
  921. if (r == -1) {
  922. session->state = STATE_REPLY;
  923. r = rspamd_snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF);
  924. destroy_session (task->s);
  925. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  926. return FALSE;
  927. }
  928. }
  929. else {
  930. session->state = STATE_LEARN_SPAM;
  931. task->dispatcher = session->dispatcher;
  932. session->learn_task = task;
  933. rspamd_dispatcher_pause (session->dispatcher);
  934. }
  935. break;
  936. case STATE_WEIGHTS:
  937. session->learn_buf = in;
  938. task = construct_task (session->worker);
  939. task->msg = memory_pool_alloc (task->task_pool, sizeof (f_str_t));
  940. task->msg->begin = in->begin;
  941. task->msg->len = in->len;
  942. task->ev_base = session->ev_base;
  943. r = process_message (task);
  944. if (r == -1) {
  945. msg_warn ("processing of message failed");
  946. free_task (task, FALSE);
  947. session->state = STATE_REPLY;
  948. r = rspamd_snprintf (out_buf, sizeof (out_buf), "cannot process message" CRLF END);
  949. if (! rspamd_dispatcher_write (session->dispatcher, out_buf, r, FALSE, FALSE)) {
  950. return FALSE;
  951. }
  952. return FALSE;
  953. }
  954. cur = g_list_first (task->text_parts);
  955. while (cur) {
  956. part = cur->data;
  957. if (part->is_empty) {
  958. cur = g_list_next (cur);
  959. continue;
  960. }
  961. c.begin = part->content->data;
  962. c.len = part->content->len;
  963. if (!session->learn_classifier->tokenizer->tokenize_func (session->learn_classifier->tokenizer,
  964. session->session_pool, &c, &tokens, FALSE, part->is_utf, part->urls_offset)) {
  965. i = rspamd_snprintf (out_buf, sizeof (out_buf), "weights failed, tokenizer error" CRLF END);
  966. free_task (task, FALSE);
  967. if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
  968. return FALSE;
  969. }
  970. session->state = STATE_REPLY;
  971. return TRUE;
  972. }
  973. cur = g_list_next (cur);
  974. }
  975. /* Handle messages without text */
  976. if (tokens == NULL) {
  977. i = rspamd_snprintf (out_buf, sizeof (out_buf), "weights failed, no tokens can be extracted (no text data)" CRLF END);
  978. free_task (task, FALSE);
  979. if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
  980. return FALSE;
  981. }
  982. session->state = STATE_REPLY;
  983. return TRUE;
  984. }
  985. /* Init classifier */
  986. cls_ctx = session->learn_classifier->classifier->init_func (session->session_pool, session->learn_classifier);
  987. cur = session->learn_classifier->classifier->weights_func (cls_ctx, session->worker->srv->statfile_pool,
  988. tokens, task);
  989. i = 0;
  990. struct classify_weight *w;
  991. while (cur) {
  992. w = cur->data;
  993. i += rspamd_snprintf (out_buf + i, sizeof (out_buf) - i, "%s: %G" CRLF, w->name, w->weight);
  994. cur = g_list_next (cur);
  995. }
  996. i += rspamd_snprintf (out_buf + i, sizeof (out_buf) - i, END);
  997. if (i != 0) {
  998. if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
  999. return FALSE;
  1000. }
  1001. }
  1002. else {
  1003. if (!rspamd_dispatcher_write (session->dispatcher, "weights failed: classifier error" CRLF END, 0, FALSE, TRUE)) {
  1004. return FALSE;
  1005. }
  1006. }
  1007. free_task (task, FALSE);
  1008. session->state = STATE_REPLY;
  1009. break;
  1010. case STATE_OTHER:
  1011. if (session->other_handler) {
  1012. session->other_handler (session, in);
  1013. }
  1014. rspamd_dispatcher_pause (session->dispatcher);
  1015. break;
  1016. case STATE_WAIT:
  1017. rspamd_dispatcher_pause (session->dispatcher);
  1018. break;
  1019. default:
  1020. msg_debug ("unknown state while reading %d", session->state);
  1021. break;
  1022. }
  1023. if (session->state == STATE_REPLY || session->state == STATE_QUIT) {
  1024. rspamd_dispatcher_restore (session->dispatcher);
  1025. }
  1026. return TRUE;
  1027. }
  1028. static gboolean
  1029. controller_write_socket (void *arg)
  1030. {
  1031. struct controller_session *session = (struct controller_session *)arg;
  1032. gint i;
  1033. gchar out_buf[1024];
  1034. GError *err = NULL;
  1035. if (session->state == STATE_QUIT) {
  1036. /* Free buffers */
  1037. destroy_session (session->s);
  1038. return FALSE;
  1039. }
  1040. else if (session->state == STATE_LEARN_SPAM) {
  1041. /* Perform actual learn here */
  1042. if (! learn_task_spam (session->learn_classifier, session->learn_task, session->in_class, &err)) {
  1043. if (err) {
  1044. i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn failed, learn classifier error: %s" CRLF END, err->message);
  1045. g_error_free (err);
  1046. }
  1047. else {
  1048. i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn failed, unknown learn classifier error" CRLF END);
  1049. }
  1050. }
  1051. else {
  1052. i = rspamd_snprintf (out_buf, sizeof (out_buf), "learn ok" CRLF END);
  1053. }
  1054. session->learn_task->dispatcher = NULL;
  1055. destroy_session (session->learn_task->s);
  1056. session->state = STATE_REPLY;
  1057. if (!rspamd_dispatcher_write (session->dispatcher, out_buf, i, FALSE, FALSE)) {
  1058. return FALSE;
  1059. }
  1060. return TRUE;
  1061. }
  1062. else if (session->state == STATE_REPLY) {
  1063. session->state = STATE_COMMAND;
  1064. rspamd_set_dispatcher_policy (session->dispatcher, BUFFER_LINE, BUFSIZ);
  1065. }
  1066. rspamd_dispatcher_restore (session->dispatcher);
  1067. return TRUE;
  1068. }
  1069. static void
  1070. controller_err_socket (GError * err, void *arg)
  1071. {
  1072. struct controller_session *session = (struct controller_session *)arg;
  1073. if (err->code != EOF) {
  1074. msg_info ("abnormally closing control connection, error: %s", err->message);
  1075. }
  1076. g_error_free (err);
  1077. /* Free buffers */
  1078. destroy_session (session->s);
  1079. }
  1080. static void
  1081. accept_socket (gint fd, short what, void *arg)
  1082. {
  1083. struct rspamd_worker *worker = (struct rspamd_worker *)arg;
  1084. union sa_union su;
  1085. struct controller_session *new_session;
  1086. struct timeval *io_tv;
  1087. socklen_t addrlen = sizeof (su.ss);
  1088. gint nfd;
  1089. struct rspamd_controller_ctx *ctx;
  1090. ctx = worker->ctx;
  1091. if ((nfd = accept_from_socket (fd, (struct sockaddr *)&su.ss, &addrlen)) == -1) {
  1092. return;
  1093. }
  1094. new_session = g_malloc (sizeof (struct controller_session));
  1095. if (new_session == NULL) {
  1096. msg_err ("cannot allocate memory for task, %s", strerror (errno));
  1097. return;
  1098. }
  1099. bzero (new_session, sizeof (struct controller_session));
  1100. new_session->worker = worker;
  1101. new_session->sock = nfd;
  1102. new_session->cfg = worker->srv->cfg;
  1103. new_session->state = STATE_COMMAND;
  1104. new_session->session_pool = memory_pool_new (memory_pool_get_size () - 1);
  1105. new_session->resolver = ctx->resolver;
  1106. new_session->ev_base = ctx->ev_base;
  1107. worker->srv->stat->control_connections_count++;
  1108. /* Set up dispatcher */
  1109. io_tv = memory_pool_alloc (new_session->session_pool, sizeof (struct timeval));
  1110. io_tv->tv_sec = ctx->timeout / 1000;
  1111. io_tv->tv_usec = ctx->timeout - io_tv->tv_sec * 1000;
  1112. new_session->s = new_async_session (new_session->session_pool, NULL, NULL, free_session, new_session);
  1113. new_session->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, controller_read_socket,
  1114. controller_write_socket, controller_err_socket, io_tv, (void *)new_session);
  1115. if (su.ss.ss_family == AF_UNIX) {
  1116. msg_info ("accepted connection from unix socket");
  1117. new_session->dispatcher->peer_addr = INADDR_LOOPBACK;
  1118. }
  1119. else if (su.ss.ss_family == AF_INET) {
  1120. msg_info ("accepted connection from %s port %d",
  1121. inet_ntoa (su.s4.sin_addr), ntohs (su.s4.sin_port));
  1122. memcpy (&new_session->dispatcher->peer_addr, &su.s4.sin_addr,
  1123. sizeof (guint32));
  1124. }
  1125. if (! rspamd_dispatcher_write (new_session->dispatcher, greetingbuf, strlen (greetingbuf), FALSE, FALSE)) {
  1126. msg_warn ("cannot write greeting");
  1127. }
  1128. }
  1129. gpointer
  1130. init_controller ()
  1131. {
  1132. struct rspamd_controller_ctx *ctx;
  1133. GQuark type;
  1134. type = g_quark_try_string ("controller");
  1135. ctx = g_malloc0 (sizeof (struct rspamd_controller_ctx));
  1136. ctx->timeout = CONTROLLER_IO_TIMEOUT * 1000;
  1137. register_worker_opt (type, "password", xml_handle_string, ctx, G_STRUCT_OFFSET (struct rspamd_controller_ctx, password));
  1138. register_worker_opt (type, "timeout", xml_handle_seconds, ctx, G_STRUCT_OFFSET (struct rspamd_controller_ctx, timeout));
  1139. return ctx;
  1140. }
  1141. void
  1142. start_controller (struct rspamd_worker *worker)
  1143. {
  1144. struct sigaction signals;
  1145. gchar *hostbuf;
  1146. gsize hostmax;
  1147. struct rspamd_controller_ctx *ctx;
  1148. worker->srv->pid = getpid ();
  1149. ctx = worker->ctx;
  1150. ctx->ev_base = event_init ();
  1151. g_mime_init (0);
  1152. init_signals (&signals, sig_handler);
  1153. sigprocmask (SIG_UNBLOCK, &signals.sa_mask, NULL);
  1154. /* SIGUSR2 handler */
  1155. signal_set (&worker->sig_ev_usr2, SIGUSR2, sigusr2_handler, (void *) worker);
  1156. event_base_set (ctx->ev_base, &worker->sig_ev_usr2);
  1157. signal_add (&worker->sig_ev_usr2, NULL);
  1158. /* SIGUSR1 handler */
  1159. signal_set (&worker->sig_ev_usr1, SIGUSR1, sigusr1_handler, (void *) worker);
  1160. event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
  1161. signal_add (&worker->sig_ev_usr1, NULL);
  1162. start_time = time (NULL);
  1163. /* Start statfile synchronization */
  1164. if (!start_statfile_sync (worker->srv->statfile_pool, worker->srv->cfg, ctx->ev_base)) {
  1165. msg_info ("cannot start statfile synchronization, statfiles would not be synchronized");
  1166. }
  1167. /* Fill hostname buf */
  1168. hostmax = sysconf (_SC_HOST_NAME_MAX) + 1;
  1169. hostbuf = alloca (hostmax);
  1170. gethostname (hostbuf, hostmax);
  1171. hostbuf[hostmax - 1] = '\0';
  1172. rspamd_snprintf (greetingbuf, sizeof (greetingbuf), "Rspamd version %s is running on %s" CRLF, RVERSION, hostbuf);
  1173. /* Accept event */
  1174. event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
  1175. event_base_set (ctx->ev_base, &worker->bind_ev);
  1176. event_add (&worker->bind_ev, NULL);
  1177. start_map_watch (ctx->ev_base);
  1178. ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
  1179. gperf_profiler_init (worker->srv->cfg, "controller");
  1180. event_base_loop (ctx->ev_base, 0);
  1181. close_log (worker->srv->logger);
  1182. exit (EXIT_SUCCESS);
  1183. }
  1184. void
  1185. register_custom_controller_command (const gchar *name, controller_func_t handler, gboolean privilleged, gboolean require_message)
  1186. {
  1187. struct custom_controller_command *cmd;
  1188. cmd = g_malloc (sizeof (struct custom_controller_command));
  1189. cmd->command = name;
  1190. cmd->handler = handler;
  1191. cmd->privilleged = privilleged;
  1192. cmd->require_message = require_message;
  1193. custom_commands = g_list_prepend (custom_commands, cmd);
  1194. }
  1195. /*
  1196. * vi:ts=4
  1197. */