You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

task.c 41KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "task.h"
  17. #include "rspamd.h"
  18. #include "scan_result.h"
  19. #include "libserver/protocol.h"
  20. #include "libserver/protocol_internal.h"
  21. #include "message.h"
  22. #include "lua/lua_common.h"
  23. #include "email_addr.h"
  24. #include "composites.h"
  25. #include "stat_api.h"
  26. #include "unix-std.h"
  27. #include "utlist.h"
  28. #include "contrib/zstd/zstd.h"
  29. #include "libserver/mempool_vars_internal.h"
  30. #include "libserver/cfg_file_private.h"
  31. #include "libmime/lang_detection.h"
  32. #include "libmime/scan_result_private.h"
  33. #ifdef WITH_JEMALLOC
  34. #include <jemalloc/jemalloc.h>
  35. #else
  36. # if defined(__GLIBC__) && defined(_GNU_SOURCE)
  37. # include <malloc.h>
  38. # endif
  39. #endif
  40. #include <math.h>
  41. __KHASH_IMPL (rspamd_req_headers_hash, static inline,
  42. rspamd_ftok_t *, struct rspamd_request_header_chain *, 1,
  43. rspamd_ftok_icase_hash, rspamd_ftok_icase_equal)
  44. /*
  45. * Do not print more than this amount of elts
  46. */
  47. static const int max_log_elts = 7;
  48. static GQuark
  49. rspamd_task_quark (void)
  50. {
  51. return g_quark_from_static_string ("task-error");
  52. }
  53. /*
  54. * Create new task
  55. */
  56. struct rspamd_task *
  57. rspamd_task_new (struct rspamd_worker *worker, struct rspamd_config *cfg,
  58. rspamd_mempool_t *pool,
  59. struct rspamd_lang_detector *lang_det,
  60. struct ev_loop *event_loop)
  61. {
  62. struct rspamd_task *new_task;
  63. rspamd_mempool_t *task_pool;
  64. guint flags = 0;
  65. if (pool == NULL) {
  66. task_pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), "task");
  67. flags |= RSPAMD_TASK_FLAG_OWN_POOL;
  68. }
  69. else {
  70. task_pool = pool;
  71. }
  72. new_task = rspamd_mempool_alloc0 (task_pool, sizeof (struct rspamd_task));
  73. new_task->task_pool = task_pool;
  74. new_task->flags = flags;
  75. new_task->worker = worker;
  76. new_task->lang_det = lang_det;
  77. if (cfg) {
  78. new_task->cfg = cfg;
  79. REF_RETAIN (cfg);
  80. if (cfg->check_all_filters) {
  81. new_task->flags |= RSPAMD_TASK_FLAG_PASS_ALL;
  82. }
  83. if (cfg->re_cache) {
  84. new_task->re_rt = rspamd_re_cache_runtime_new (cfg->re_cache);
  85. }
  86. if (new_task->lang_det == NULL && cfg->lang_det != NULL) {
  87. new_task->lang_det = cfg->lang_det;
  88. }
  89. }
  90. new_task->event_loop = event_loop;
  91. new_task->task_timestamp = ev_time ();
  92. new_task->time_real_finish = NAN;
  93. new_task->request_headers = kh_init (rspamd_req_headers_hash);
  94. new_task->sock = -1;
  95. new_task->flags |= (RSPAMD_TASK_FLAG_MIME);
  96. new_task->result = rspamd_create_metric_result (new_task);
  97. new_task->queue_id = "undef";
  98. new_task->messages = ucl_object_typed_new (UCL_OBJECT);
  99. new_task->lua_cache = g_hash_table_new (rspamd_str_hash, rspamd_str_equal);
  100. return new_task;
  101. }
  102. static void
  103. rspamd_task_reply (struct rspamd_task *task)
  104. {
  105. const ev_tstamp write_timeout = 2.0;
  106. if (task->fin_callback) {
  107. task->fin_callback (task, task->fin_arg);
  108. }
  109. else {
  110. if (!(task->processed_stages & RSPAMD_TASK_STAGE_REPLIED)) {
  111. rspamd_protocol_write_reply (task, write_timeout);
  112. }
  113. }
  114. }
  115. /*
  116. * Called if all filters are processed
  117. * @return TRUE if session should be terminated
  118. */
  119. gboolean
  120. rspamd_task_fin (void *arg)
  121. {
  122. struct rspamd_task *task = (struct rspamd_task *) arg;
  123. /* Task is already finished or skipped */
  124. if (RSPAMD_TASK_IS_PROCESSED (task)) {
  125. rspamd_task_reply (task);
  126. return TRUE;
  127. }
  128. if (!rspamd_task_process (task, RSPAMD_TASK_PROCESS_ALL)) {
  129. rspamd_task_reply (task);
  130. return TRUE;
  131. }
  132. if (RSPAMD_TASK_IS_PROCESSED (task)) {
  133. rspamd_task_reply (task);
  134. return TRUE;
  135. }
  136. /* One more iteration */
  137. return FALSE;
  138. }
  139. /*
  140. * Called if session was restored inside fin callback
  141. */
  142. void
  143. rspamd_task_restore (void *arg)
  144. {
  145. /* XXX: not needed now ? */
  146. }
  147. /*
  148. * Free all structures of worker_task
  149. */
  150. void
  151. rspamd_task_free (struct rspamd_task *task)
  152. {
  153. struct rspamd_email_address *addr;
  154. struct rspamd_lua_cached_entry *entry;
  155. static guint free_iters = 0;
  156. GHashTableIter it;
  157. gpointer k, v;
  158. guint i;
  159. if (task) {
  160. debug_task ("free pointer %p", task);
  161. if (task->rcpt_envelope) {
  162. for (i = 0; i < task->rcpt_envelope->len; i ++) {
  163. addr = g_ptr_array_index (task->rcpt_envelope, i);
  164. rspamd_email_address_free (addr);
  165. }
  166. g_ptr_array_free (task->rcpt_envelope, TRUE);
  167. }
  168. if (task->from_envelope) {
  169. rspamd_email_address_free (task->from_envelope);
  170. }
  171. if (task->meta_words) {
  172. g_array_free (task->meta_words, TRUE);
  173. }
  174. ucl_object_unref (task->messages);
  175. if (task->re_rt) {
  176. rspamd_re_cache_runtime_destroy (task->re_rt);
  177. }
  178. if (task->http_conn != NULL) {
  179. rspamd_http_connection_reset (task->http_conn);
  180. rspamd_http_connection_unref (task->http_conn);
  181. }
  182. if (task->settings != NULL) {
  183. ucl_object_unref (task->settings);
  184. }
  185. if (task->settings_elt != NULL) {
  186. REF_RELEASE (task->settings_elt);
  187. }
  188. if (task->client_addr) {
  189. rspamd_inet_address_free (task->client_addr);
  190. }
  191. if (task->from_addr) {
  192. rspamd_inet_address_free (task->from_addr);
  193. }
  194. if (task->err) {
  195. g_error_free (task->err);
  196. }
  197. ev_timer_stop (task->event_loop, &task->timeout_ev);
  198. ev_io_stop (task->event_loop, &task->guard_ev);
  199. if (task->sock != -1) {
  200. close (task->sock);
  201. }
  202. if (task->cfg) {
  203. if (task->lua_cache) {
  204. g_hash_table_iter_init (&it, task->lua_cache);
  205. while (g_hash_table_iter_next (&it, &k, &v)) {
  206. entry = (struct rspamd_lua_cached_entry *)v;
  207. luaL_unref (task->cfg->lua_state,
  208. LUA_REGISTRYINDEX, entry->ref);
  209. }
  210. g_hash_table_unref (task->lua_cache);
  211. }
  212. if (task->cfg->full_gc_iters && (++free_iters > task->cfg->full_gc_iters)) {
  213. /* Perform more expensive cleanup cycle */
  214. gsize allocated = 0, active = 0, metadata = 0,
  215. resident = 0, mapped = 0, old_lua_mem = 0;
  216. gdouble t1, t2;
  217. old_lua_mem = lua_gc (task->cfg->lua_state, LUA_GCCOUNT, 0);
  218. t1 = rspamd_get_ticks (FALSE);
  219. #ifdef WITH_JEMALLOC
  220. gsize sz = sizeof (gsize);
  221. mallctl ("stats.allocated", &allocated, &sz, NULL, 0);
  222. mallctl ("stats.active", &active, &sz, NULL, 0);
  223. mallctl ("stats.metadata", &metadata, &sz, NULL, 0);
  224. mallctl ("stats.resident", &resident, &sz, NULL, 0);
  225. mallctl ("stats.mapped", &mapped, &sz, NULL, 0);
  226. #else
  227. # if defined(__GLIBC__) && defined(_GNU_SOURCE)
  228. malloc_trim (0);
  229. # endif
  230. #endif
  231. lua_gc (task->cfg->lua_state, LUA_GCCOLLECT, 0);
  232. t2 = rspamd_get_ticks (FALSE);
  233. msg_notice_task ("perform full gc cycle; memory stats: "
  234. "%Hz allocated, %Hz active, %Hz metadata, %Hz resident, %Hz mapped;"
  235. " lua memory: %z kb -> %d kb; %f ms for gc iter",
  236. allocated, active, metadata, resident, mapped,
  237. old_lua_mem, lua_gc (task->cfg->lua_state, LUA_GCCOUNT, 0),
  238. (t2 - t1) * 1000.0);
  239. free_iters = rspamd_time_jitter (0,
  240. (gdouble)task->cfg->full_gc_iters / 2);
  241. }
  242. REF_RELEASE (task->cfg);
  243. }
  244. kh_destroy (rspamd_req_headers_hash, task->request_headers);
  245. rspamd_message_unref (task->message);
  246. if (task->flags & RSPAMD_TASK_FLAG_OWN_POOL) {
  247. rspamd_mempool_delete (task->task_pool);
  248. }
  249. }
  250. }
  251. struct rspamd_task_map {
  252. gpointer begin;
  253. gulong len;
  254. gint fd;
  255. };
  256. static void
  257. rspamd_task_unmapper (gpointer ud)
  258. {
  259. struct rspamd_task_map *m = ud;
  260. munmap (m->begin, m->len);
  261. close (m->fd);
  262. }
  263. gboolean
  264. rspamd_task_load_message (struct rspamd_task *task,
  265. struct rspamd_http_message *msg, const gchar *start, gsize len)
  266. {
  267. guint control_len, r;
  268. struct ucl_parser *parser;
  269. ucl_object_t *control_obj;
  270. gchar filepath[PATH_MAX], *fp;
  271. gint fd, flen;
  272. gulong offset = 0, shmem_size = 0;
  273. rspamd_ftok_t *tok;
  274. gpointer map;
  275. struct stat st;
  276. struct rspamd_task_map *m;
  277. const gchar *ft;
  278. #ifdef HAVE_SANE_SHMEM
  279. ft = "shm";
  280. #else
  281. ft = "file";
  282. #endif
  283. if (msg) {
  284. rspamd_protocol_handle_headers (task, msg);
  285. }
  286. tok = rspamd_task_get_request_header (task, "shm");
  287. if (tok) {
  288. /* Shared memory part */
  289. r = rspamd_strlcpy (filepath, tok->begin,
  290. MIN (sizeof (filepath), tok->len + 1));
  291. rspamd_url_decode (filepath, filepath, r + 1);
  292. flen = strlen (filepath);
  293. if (filepath[0] == '"' && flen > 2) {
  294. /* We need to unquote filepath */
  295. fp = &filepath[1];
  296. fp[flen - 2] = '\0';
  297. }
  298. else {
  299. fp = &filepath[0];
  300. }
  301. #ifdef HAVE_SANE_SHMEM
  302. fd = shm_open (fp, O_RDONLY, 00600);
  303. #else
  304. fd = open (fp, O_RDONLY, 00600);
  305. #endif
  306. if (fd == -1) {
  307. g_set_error (&task->err, rspamd_task_quark(), RSPAMD_PROTOCOL_ERROR,
  308. "Cannot open %s segment (%s): %s", ft, fp, strerror (errno));
  309. return FALSE;
  310. }
  311. if (fstat (fd, &st) == -1) {
  312. g_set_error (&task->err, rspamd_task_quark(), RSPAMD_PROTOCOL_ERROR,
  313. "Cannot stat %s segment (%s): %s", ft, fp, strerror (errno));
  314. close (fd);
  315. return FALSE;
  316. }
  317. map = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, fd, 0);
  318. if (map == MAP_FAILED) {
  319. close (fd);
  320. g_set_error (&task->err, rspamd_task_quark(), RSPAMD_PROTOCOL_ERROR,
  321. "Cannot mmap %s (%s): %s", ft, fp, strerror (errno));
  322. return FALSE;
  323. }
  324. tok = rspamd_task_get_request_header (task, "shm-offset");
  325. if (tok) {
  326. rspamd_strtoul (tok->begin, tok->len, &offset);
  327. if (offset > (gulong)st.st_size) {
  328. msg_err_task ("invalid offset %ul (%ul available) for shm "
  329. "segment %s", offset, (gulong)st.st_size, fp);
  330. munmap (map, st.st_size);
  331. close (fd);
  332. return FALSE;
  333. }
  334. }
  335. tok = rspamd_task_get_request_header (task, "shm-length");
  336. shmem_size = st.st_size;
  337. if (tok) {
  338. rspamd_strtoul (tok->begin, tok->len, &shmem_size);
  339. if (shmem_size > (gulong)st.st_size) {
  340. msg_err_task ("invalid length %ul (%ul available) for %s "
  341. "segment %s", shmem_size, (gulong)st.st_size, ft, fp);
  342. munmap (map, st.st_size);
  343. close (fd);
  344. return FALSE;
  345. }
  346. }
  347. task->msg.begin = ((guchar *)map) + offset;
  348. task->msg.len = shmem_size;
  349. m = rspamd_mempool_alloc (task->task_pool, sizeof (*m));
  350. m->begin = map;
  351. m->len = st.st_size;
  352. m->fd = fd;
  353. msg_info_task ("loaded message from shared memory %s (%ul size, %ul offset), fd=%d",
  354. fp, shmem_size, offset, fd);
  355. rspamd_mempool_add_destructor (task->task_pool, rspamd_task_unmapper, m);
  356. return TRUE;
  357. }
  358. tok = rspamd_task_get_request_header (task, "file");
  359. if (tok == NULL) {
  360. tok = rspamd_task_get_request_header (task, "path");
  361. }
  362. if (tok) {
  363. debug_task ("want to scan file %T", tok);
  364. r = rspamd_strlcpy (filepath, tok->begin,
  365. MIN (sizeof (filepath), tok->len + 1));
  366. rspamd_url_decode (filepath, filepath, r + 1);
  367. flen = strlen (filepath);
  368. if (filepath[0] == '"' && flen > 2) {
  369. /* We need to unquote filepath */
  370. fp = &filepath[1];
  371. fp[flen - 2] = '\0';
  372. }
  373. else {
  374. fp = &filepath[0];
  375. }
  376. if (stat (fp, &st) == -1) {
  377. g_set_error (&task->err, rspamd_task_quark(), RSPAMD_PROTOCOL_ERROR,
  378. "Invalid file (%s): %s", fp, strerror (errno));
  379. return FALSE;
  380. }
  381. if (G_UNLIKELY (st.st_size == 0)) {
  382. /* Empty file */
  383. task->flags |= RSPAMD_TASK_FLAG_EMPTY;
  384. task->msg.begin = rspamd_mempool_strdup (task->task_pool, "");
  385. task->msg.len = 0;
  386. }
  387. else {
  388. fd = open (fp, O_RDONLY);
  389. if (fd == -1) {
  390. g_set_error (&task->err, rspamd_task_quark (),
  391. RSPAMD_PROTOCOL_ERROR,
  392. "Cannot open file (%s): %s", fp, strerror (errno));
  393. return FALSE;
  394. }
  395. map = mmap (NULL, st.st_size, PROT_READ, MAP_SHARED, fd, 0);
  396. if (map == MAP_FAILED) {
  397. close (fd);
  398. g_set_error (&task->err, rspamd_task_quark (),
  399. RSPAMD_PROTOCOL_ERROR,
  400. "Cannot mmap file (%s): %s", fp, strerror (errno));
  401. return FALSE;
  402. }
  403. task->msg.begin = map;
  404. task->msg.len = st.st_size;
  405. m = rspamd_mempool_alloc (task->task_pool, sizeof (*m));
  406. m->begin = map;
  407. m->len = st.st_size;
  408. m->fd = fd;
  409. rspamd_mempool_add_destructor (task->task_pool, rspamd_task_unmapper, m);
  410. }
  411. task->msg.fpath = rspamd_mempool_strdup (task->task_pool, fp);
  412. task->flags |= RSPAMD_TASK_FLAG_FILE;
  413. msg_info_task ("loaded message from file %s", fp);
  414. return TRUE;
  415. }
  416. /* Plain data */
  417. debug_task ("got input of length %z", task->msg.len);
  418. /* Check compression */
  419. tok = rspamd_task_get_request_header (task, "compression");
  420. if (tok) {
  421. /* Need to uncompress */
  422. rspamd_ftok_t t;
  423. t.begin = "zstd";
  424. t.len = 4;
  425. if (rspamd_ftok_casecmp (tok, &t) == 0) {
  426. ZSTD_DStream *zstream;
  427. ZSTD_inBuffer zin;
  428. ZSTD_outBuffer zout;
  429. guchar *out;
  430. gsize outlen, r;
  431. gulong dict_id;
  432. if (!rspamd_libs_reset_decompression (task->cfg->libs_ctx)) {
  433. g_set_error (&task->err, rspamd_task_quark(),
  434. RSPAMD_PROTOCOL_ERROR,
  435. "Cannot decompress, decompressor init failed");
  436. return FALSE;
  437. }
  438. tok = rspamd_task_get_request_header (task, "dictionary");
  439. if (tok != NULL) {
  440. /* We need to use custom dictionary */
  441. if (!rspamd_strtoul (tok->begin, tok->len, &dict_id)) {
  442. g_set_error (&task->err, rspamd_task_quark(), RSPAMD_PROTOCOL_ERROR,
  443. "Non numeric dictionary");
  444. return FALSE;
  445. }
  446. if (!task->cfg->libs_ctx->in_dict) {
  447. g_set_error (&task->err, rspamd_task_quark(), RSPAMD_PROTOCOL_ERROR,
  448. "Unknown dictionary, undefined locally");
  449. return FALSE;
  450. }
  451. if (task->cfg->libs_ctx->in_dict->id != dict_id) {
  452. g_set_error (&task->err, rspamd_task_quark(), RSPAMD_PROTOCOL_ERROR,
  453. "Unknown dictionary, invalid dictionary id");
  454. return FALSE;
  455. }
  456. }
  457. zstream = task->cfg->libs_ctx->in_zstream;
  458. zin.pos = 0;
  459. zin.src = start;
  460. zin.size = len;
  461. if ((outlen = ZSTD_getDecompressedSize (start, len)) == 0) {
  462. outlen = ZSTD_DStreamOutSize ();
  463. }
  464. out = g_malloc (outlen);
  465. zout.dst = out;
  466. zout.pos = 0;
  467. zout.size = outlen;
  468. while (zin.pos < zin.size) {
  469. r = ZSTD_decompressStream (zstream, &zout, &zin);
  470. if (ZSTD_isError (r)) {
  471. g_set_error (&task->err, rspamd_task_quark(),
  472. RSPAMD_PROTOCOL_ERROR,
  473. "Decompression error: %s", ZSTD_getErrorName (r));
  474. return FALSE;
  475. }
  476. if (zout.pos == zout.size) {
  477. /* We need to extend output buffer */
  478. zout.size = zout.size * 2 + 1;
  479. zout.dst = g_realloc (zout.dst, zout.size);
  480. }
  481. }
  482. rspamd_mempool_add_destructor (task->task_pool, g_free, zout.dst);
  483. task->msg.begin = zout.dst;
  484. task->msg.len = zout.pos;
  485. task->protocol_flags |= RSPAMD_TASK_PROTOCOL_FLAG_COMPRESSED;
  486. msg_info_task ("loaded message from zstd compressed stream; "
  487. "compressed: %ul; uncompressed: %ul",
  488. (gulong)zin.size, (gulong)zout.pos);
  489. }
  490. else {
  491. g_set_error (&task->err, rspamd_task_quark(), RSPAMD_PROTOCOL_ERROR,
  492. "Invalid compression method");
  493. return FALSE;
  494. }
  495. }
  496. else {
  497. task->msg.begin = start;
  498. task->msg.len = len;
  499. }
  500. if (task->msg.len == 0) {
  501. task->flags |= RSPAMD_TASK_FLAG_EMPTY;
  502. }
  503. if (task->protocol_flags & RSPAMD_TASK_PROTOCOL_FLAG_HAS_CONTROL) {
  504. rspamd_ftok_t *hv = rspamd_task_get_request_header (task, MLEN_HEADER);
  505. gulong message_len = 0;
  506. if (!hv || !rspamd_strtoul (hv->begin, hv->len, &message_len) ||
  507. task->msg.len < message_len) {
  508. msg_warn_task ("message has invalid message length: %ul and total len: %ul",
  509. message_len, task->msg.len);
  510. g_set_error (&task->err, rspamd_task_quark(), RSPAMD_PROTOCOL_ERROR,
  511. "Invalid length");
  512. return FALSE;
  513. }
  514. control_len = task->msg.len - message_len;
  515. if (control_len > 0) {
  516. parser = ucl_parser_new (UCL_PARSER_KEY_LOWERCASE);
  517. if (!ucl_parser_add_chunk (parser, task->msg.begin, control_len)) {
  518. msg_warn_task ("processing of control chunk failed: %s",
  519. ucl_parser_get_error (parser));
  520. ucl_parser_free (parser);
  521. }
  522. else {
  523. control_obj = ucl_parser_get_object (parser);
  524. ucl_parser_free (parser);
  525. rspamd_protocol_handle_control (task, control_obj);
  526. ucl_object_unref (control_obj);
  527. }
  528. task->msg.begin += control_len;
  529. task->msg.len -= control_len;
  530. }
  531. }
  532. return TRUE;
  533. }
  534. static gint
  535. rspamd_task_select_processing_stage (struct rspamd_task *task, guint stages)
  536. {
  537. gint st, mask;
  538. mask = task->processed_stages;
  539. if (mask == 0) {
  540. st = 0;
  541. }
  542. else {
  543. for (st = 1; mask != 1; st ++) {
  544. mask = (unsigned int)mask >> 1;
  545. }
  546. }
  547. st = 1 << st;
  548. if (stages & st) {
  549. return st;
  550. }
  551. else if (st < RSPAMD_TASK_STAGE_DONE) {
  552. /* We assume that the stage that was not requested is done */
  553. task->processed_stages |= st;
  554. return rspamd_task_select_processing_stage (task, stages);
  555. }
  556. /* We are done */
  557. return RSPAMD_TASK_STAGE_DONE;
  558. }
  559. gboolean
  560. rspamd_task_process (struct rspamd_task *task, guint stages)
  561. {
  562. gint st;
  563. gboolean ret = TRUE, all_done = TRUE;
  564. GError *stat_error = NULL;
  565. /* Avoid nested calls */
  566. if (task->flags & RSPAMD_TASK_FLAG_PROCESSING) {
  567. return TRUE;
  568. }
  569. if (RSPAMD_TASK_IS_PROCESSED (task)) {
  570. return TRUE;
  571. }
  572. task->flags |= RSPAMD_TASK_FLAG_PROCESSING;
  573. st = rspamd_task_select_processing_stage (task, stages);
  574. switch (st) {
  575. case RSPAMD_TASK_STAGE_READ_MESSAGE:
  576. if (!rspamd_message_parse (task)) {
  577. ret = FALSE;
  578. }
  579. break;
  580. case RSPAMD_TASK_STAGE_PRE_FILTERS_EMPTY:
  581. case RSPAMD_TASK_STAGE_PRE_FILTERS:
  582. case RSPAMD_TASK_STAGE_FILTERS:
  583. case RSPAMD_TASK_STAGE_IDEMPOTENT:
  584. all_done = rspamd_symcache_process_symbols (task, task->cfg->cache, st);
  585. break;
  586. case RSPAMD_TASK_STAGE_PROCESS_MESSAGE:
  587. if (!(task->flags & RSPAMD_TASK_FLAG_SKIP_PROCESS)) {
  588. rspamd_message_process (task);
  589. }
  590. break;
  591. case RSPAMD_TASK_STAGE_CLASSIFIERS:
  592. case RSPAMD_TASK_STAGE_CLASSIFIERS_PRE:
  593. case RSPAMD_TASK_STAGE_CLASSIFIERS_POST:
  594. if (!RSPAMD_TASK_IS_EMPTY (task)) {
  595. if (rspamd_stat_classify (task, task->cfg->lua_state, st, &stat_error) ==
  596. RSPAMD_STAT_PROCESS_ERROR) {
  597. msg_err_task ("classify error: %e", stat_error);
  598. g_error_free (stat_error);
  599. }
  600. }
  601. break;
  602. case RSPAMD_TASK_STAGE_COMPOSITES:
  603. rspamd_make_composites (task);
  604. break;
  605. case RSPAMD_TASK_STAGE_POST_FILTERS:
  606. all_done = rspamd_symcache_process_symbols (task, task->cfg->cache,
  607. st);
  608. if (all_done && (task->flags & RSPAMD_TASK_FLAG_LEARN_AUTO) &&
  609. !RSPAMD_TASK_IS_EMPTY (task) &&
  610. !(task->flags & (RSPAMD_TASK_FLAG_LEARN_SPAM|RSPAMD_TASK_FLAG_LEARN_HAM))) {
  611. rspamd_stat_check_autolearn (task);
  612. }
  613. break;
  614. case RSPAMD_TASK_STAGE_LEARN:
  615. case RSPAMD_TASK_STAGE_LEARN_PRE:
  616. case RSPAMD_TASK_STAGE_LEARN_POST:
  617. if (task->flags & (RSPAMD_TASK_FLAG_LEARN_SPAM|RSPAMD_TASK_FLAG_LEARN_HAM)) {
  618. if (task->err == NULL) {
  619. if (!rspamd_stat_learn (task,
  620. task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM,
  621. task->cfg->lua_state, task->classifier,
  622. st, &stat_error)) {
  623. if (stat_error == NULL) {
  624. g_set_error (&stat_error,
  625. g_quark_from_static_string ("stat"), 500,
  626. "Unknown statistics error, found on stage %s;"
  627. " classifier: %s",
  628. rspamd_task_stage_name (st), task->classifier);
  629. }
  630. if (stat_error->code >= 400) {
  631. msg_err_task ("learn error: %e", stat_error);
  632. }
  633. else {
  634. msg_notice_task ("skip learning: %e", stat_error);
  635. }
  636. if (!(task->flags & RSPAMD_TASK_FLAG_LEARN_AUTO)) {
  637. task->err = stat_error;
  638. task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
  639. }
  640. else {
  641. /* Do not skip idempotent in case of learn error */
  642. if (stat_error) {
  643. g_error_free (stat_error);
  644. }
  645. task->processed_stages |= RSPAMD_TASK_STAGE_LEARN|
  646. RSPAMD_TASK_STAGE_LEARN_PRE|
  647. RSPAMD_TASK_STAGE_LEARN_POST;
  648. }
  649. }
  650. }
  651. }
  652. break;
  653. case RSPAMD_TASK_STAGE_COMPOSITES_POST:
  654. /* Second run of composites processing before idempotent filters */
  655. rspamd_make_composites (task);
  656. break;
  657. case RSPAMD_TASK_STAGE_DONE:
  658. task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
  659. break;
  660. default:
  661. /* TODO: not implemented stage */
  662. break;
  663. }
  664. if (RSPAMD_TASK_IS_SKIPPED (task)) {
  665. /* Set all bits except idempotent filters */
  666. task->processed_stages |= 0x7FFF;
  667. }
  668. task->flags &= ~RSPAMD_TASK_FLAG_PROCESSING;
  669. if (!ret || RSPAMD_TASK_IS_PROCESSED (task)) {
  670. if (!ret) {
  671. /* Set processed flags */
  672. task->processed_stages |= RSPAMD_TASK_STAGE_DONE;
  673. }
  674. msg_debug_task ("task is processed");
  675. return ret;
  676. }
  677. if (ret) {
  678. if (rspamd_session_events_pending (task->s) != 0) {
  679. /* We have events pending, so we consider this stage as incomplete */
  680. msg_debug_task ("need more work on stage %d", st);
  681. }
  682. else {
  683. if (all_done) {
  684. /* Mark the current stage as done and go to the next stage */
  685. msg_debug_task ("completed stage %d", st);
  686. task->processed_stages |= st;
  687. }
  688. else {
  689. msg_debug_task ("need more processing on stage %d", st);
  690. }
  691. /* Tail recursion */
  692. return rspamd_task_process (task, stages);
  693. }
  694. }
  695. return ret;
  696. }
  697. struct rspamd_email_address*
  698. rspamd_task_get_sender (struct rspamd_task *task)
  699. {
  700. return task->from_envelope;
  701. }
  702. static const gchar *
  703. rspamd_task_cache_principal_recipient (struct rspamd_task *task,
  704. const gchar *rcpt, gsize len)
  705. {
  706. gchar *rcpt_lc;
  707. if (rcpt == NULL) {
  708. return NULL;
  709. }
  710. rcpt_lc = rspamd_mempool_alloc (task->task_pool, len + 1);
  711. rspamd_strlcpy (rcpt_lc, rcpt, len + 1);
  712. rspamd_str_lc (rcpt_lc, len);
  713. rspamd_mempool_set_variable (task->task_pool,
  714. RSPAMD_MEMPOOL_PRINCIPAL_RECIPIENT, rcpt_lc, NULL);
  715. return rcpt_lc;
  716. }
  717. const gchar *
  718. rspamd_task_get_principal_recipient (struct rspamd_task *task)
  719. {
  720. const gchar *val;
  721. struct rspamd_email_address *addr;
  722. guint i;
  723. val = rspamd_mempool_get_variable (task->task_pool,
  724. RSPAMD_MEMPOOL_PRINCIPAL_RECIPIENT);
  725. if (val) {
  726. return val;
  727. }
  728. if (task->deliver_to) {
  729. return rspamd_task_cache_principal_recipient (task, task->deliver_to,
  730. strlen (task->deliver_to));
  731. }
  732. if (task->rcpt_envelope != NULL) {
  733. PTR_ARRAY_FOREACH (task->rcpt_envelope, i, addr) {
  734. if (addr->addr && !(addr->flags & RSPAMD_EMAIL_ADDR_ORIGINAL)) {
  735. return rspamd_task_cache_principal_recipient (task, addr->addr,
  736. addr->addr_len);
  737. }
  738. }
  739. }
  740. GPtrArray *rcpt_mime = MESSAGE_FIELD_CHECK (task, rcpt_mime);
  741. if (rcpt_mime != NULL && rcpt_mime->len > 0) {
  742. PTR_ARRAY_FOREACH (rcpt_mime, i, addr) {
  743. if (addr->addr && !(addr->flags & RSPAMD_EMAIL_ADDR_ORIGINAL)) {
  744. return rspamd_task_cache_principal_recipient (task, addr->addr,
  745. addr->addr_len);
  746. }
  747. }
  748. }
  749. return NULL;
  750. }
  751. gboolean
  752. rspamd_learn_task_spam (struct rspamd_task *task,
  753. gboolean is_spam,
  754. const gchar *classifier,
  755. GError **err)
  756. {
  757. if (is_spam) {
  758. task->flags |= RSPAMD_TASK_FLAG_LEARN_SPAM;
  759. }
  760. else {
  761. task->flags |= RSPAMD_TASK_FLAG_LEARN_HAM;
  762. }
  763. task->classifier = classifier;
  764. return TRUE;
  765. }
  766. static gboolean
  767. rspamd_task_log_check_condition (struct rspamd_task *task,
  768. struct rspamd_log_format *lf)
  769. {
  770. gboolean ret = FALSE;
  771. switch (lf->type) {
  772. case RSPAMD_LOG_MID:
  773. if (MESSAGE_FIELD_CHECK (task, message_id) &&
  774. strcmp (MESSAGE_FIELD (task, message_id) , "undef") != 0) {
  775. ret = TRUE;
  776. }
  777. break;
  778. case RSPAMD_LOG_QID:
  779. if (task->queue_id && strcmp (task->queue_id, "undef") != 0) {
  780. ret = TRUE;
  781. }
  782. break;
  783. case RSPAMD_LOG_USER:
  784. if (task->user) {
  785. ret = TRUE;
  786. }
  787. break;
  788. case RSPAMD_LOG_IP:
  789. if (task->from_addr && rspamd_ip_is_valid (task->from_addr)) {
  790. ret = TRUE;
  791. }
  792. break;
  793. case RSPAMD_LOG_SMTP_RCPT:
  794. case RSPAMD_LOG_SMTP_RCPTS:
  795. if (task->rcpt_envelope && task->rcpt_envelope->len > 0) {
  796. ret = TRUE;
  797. }
  798. break;
  799. case RSPAMD_LOG_MIME_RCPT:
  800. case RSPAMD_LOG_MIME_RCPTS:
  801. if (MESSAGE_FIELD_CHECK (task, rcpt_mime) &&
  802. MESSAGE_FIELD (task, rcpt_mime)->len > 0) {
  803. ret = TRUE;
  804. }
  805. break;
  806. case RSPAMD_LOG_SMTP_FROM:
  807. if (task->from_envelope) {
  808. ret = TRUE;
  809. }
  810. break;
  811. case RSPAMD_LOG_MIME_FROM:
  812. if (MESSAGE_FIELD_CHECK (task, from_mime) &&
  813. MESSAGE_FIELD (task, from_mime)->len > 0) {
  814. ret = TRUE;
  815. }
  816. break;
  817. case RSPAMD_LOG_FILENAME:
  818. if (task->msg.fpath) {
  819. ret = TRUE;
  820. }
  821. break;
  822. case RSPAMD_LOG_FORCED_ACTION:
  823. if (task->result->passthrough_result) {
  824. ret = TRUE;
  825. }
  826. break;
  827. case RSPAMD_LOG_SETTINGS_ID:
  828. if (task->settings_elt) {
  829. ret = TRUE;
  830. }
  831. break;
  832. default:
  833. ret = TRUE;
  834. break;
  835. }
  836. return ret;
  837. }
  838. /*
  839. * Sort by symbol's score -> name
  840. */
  841. static gint
  842. rspamd_task_compare_log_sym (gconstpointer a, gconstpointer b)
  843. {
  844. const struct rspamd_symbol_result *s1 = *(const struct rspamd_symbol_result **)a,
  845. *s2 = *(const struct rspamd_symbol_result **)b;
  846. gdouble w1, w2;
  847. w1 = fabs (s1->score);
  848. w2 = fabs (s2->score);
  849. if (w1 == w2 && s1->name && s2->name) {
  850. return strcmp (s1->name, s2->name);
  851. }
  852. return (w2 - w1) * 1000.0;
  853. }
  854. static gint
  855. rspamd_task_compare_log_group (gconstpointer a, gconstpointer b)
  856. {
  857. const struct rspamd_symbols_group *s1 = *(const struct rspamd_symbols_group **)a,
  858. *s2 = *(const struct rspamd_symbols_group **)b;
  859. return strcmp (s1->name, s2->name);
  860. }
  861. static rspamd_ftok_t
  862. rspamd_task_log_metric_res (struct rspamd_task *task,
  863. struct rspamd_log_format *lf)
  864. {
  865. static gchar scorebuf[32];
  866. rspamd_ftok_t res = {.begin = NULL, .len = 0};
  867. struct rspamd_scan_result *mres;
  868. gboolean first = TRUE;
  869. rspamd_fstring_t *symbuf;
  870. struct rspamd_symbol_result *sym;
  871. GPtrArray *sorted_symbols;
  872. struct rspamd_action *act;
  873. struct rspamd_symbols_group *gr;
  874. guint i, j;
  875. khiter_t k;
  876. mres = task->result;
  877. act = rspamd_check_action_metric (task);
  878. if (mres != NULL) {
  879. switch (lf->type) {
  880. case RSPAMD_LOG_ISSPAM:
  881. if (RSPAMD_TASK_IS_SKIPPED (task)) {
  882. res.begin = "S";
  883. }
  884. else if (!(act->flags & RSPAMD_ACTION_HAM)) {
  885. res.begin = "T";
  886. }
  887. else {
  888. res.begin = "F";
  889. }
  890. res.len = 1;
  891. break;
  892. case RSPAMD_LOG_ACTION:
  893. res.begin = act->name;
  894. res.len = strlen (res.begin);
  895. break;
  896. case RSPAMD_LOG_SCORES:
  897. res.len = rspamd_snprintf (scorebuf, sizeof (scorebuf), "%.2f/%.2f",
  898. mres->score, rspamd_task_get_required_score (task, mres));
  899. res.begin = scorebuf;
  900. break;
  901. case RSPAMD_LOG_SYMBOLS:
  902. symbuf = rspamd_fstring_sized_new (128);
  903. sorted_symbols = g_ptr_array_sized_new (kh_size (mres->symbols));
  904. kh_foreach_value_ptr (mres->symbols, sym, {
  905. if (!(sym->flags & RSPAMD_SYMBOL_RESULT_IGNORED)) {
  906. g_ptr_array_add (sorted_symbols, (gpointer)sym);
  907. }
  908. });
  909. g_ptr_array_sort (sorted_symbols, rspamd_task_compare_log_sym);
  910. for (i = 0; i < sorted_symbols->len; i ++) {
  911. sym = g_ptr_array_index (sorted_symbols, i);
  912. if (first) {
  913. rspamd_printf_fstring (&symbuf, "%s", sym->name);
  914. }
  915. else {
  916. rspamd_printf_fstring (&symbuf, ",%s", sym->name);
  917. }
  918. if (lf->flags & RSPAMD_LOG_FMT_FLAG_SYMBOLS_SCORES) {
  919. rspamd_printf_fstring (&symbuf, "(%.2f)", sym->score);
  920. }
  921. if (lf->flags & RSPAMD_LOG_FMT_FLAG_SYMBOLS_PARAMS) {
  922. rspamd_printf_fstring (&symbuf, "{");
  923. if (sym->options) {
  924. struct rspamd_symbol_option *opt;
  925. j = 0;
  926. DL_FOREACH (sym->opts_head, opt) {
  927. rspamd_printf_fstring (&symbuf, "%s;", opt->option);
  928. if (j >= max_log_elts) {
  929. rspamd_printf_fstring (&symbuf, "...;");
  930. break;
  931. }
  932. j ++;
  933. }
  934. }
  935. rspamd_printf_fstring (&symbuf, "}");
  936. }
  937. first = FALSE;
  938. }
  939. g_ptr_array_free (sorted_symbols, TRUE);
  940. rspamd_mempool_add_destructor (task->task_pool,
  941. (rspamd_mempool_destruct_t)rspamd_fstring_free,
  942. symbuf);
  943. res.begin = symbuf->str;
  944. res.len = symbuf->len;
  945. break;
  946. case RSPAMD_LOG_GROUPS:
  947. case RSPAMD_LOG_PUBLIC_GROUPS:
  948. symbuf = rspamd_fstring_sized_new (128);
  949. sorted_symbols = g_ptr_array_sized_new (kh_size (mres->sym_groups));
  950. kh_foreach_key (mres->sym_groups, gr,{
  951. if (!(gr->flags & RSPAMD_SYMBOL_GROUP_PUBLIC)) {
  952. if (lf->type == RSPAMD_LOG_PUBLIC_GROUPS) {
  953. continue;
  954. }
  955. }
  956. g_ptr_array_add (sorted_symbols, gr);
  957. });
  958. g_ptr_array_sort (sorted_symbols, rspamd_task_compare_log_group);
  959. for (i = 0; i < sorted_symbols->len; i++) {
  960. gr = g_ptr_array_index (sorted_symbols, i);
  961. if (first) {
  962. rspamd_printf_fstring (&symbuf, "%s", gr->name);
  963. }
  964. else {
  965. rspamd_printf_fstring (&symbuf, ",%s", gr->name);
  966. }
  967. k = kh_get (rspamd_symbols_group_hash, mres->sym_groups, gr);
  968. rspamd_printf_fstring (&symbuf, "(%.2f)",
  969. kh_value (mres->sym_groups, k));
  970. first = FALSE;
  971. }
  972. g_ptr_array_free (sorted_symbols, TRUE);
  973. rspamd_mempool_add_destructor (task->task_pool,
  974. (rspamd_mempool_destruct_t) rspamd_fstring_free,
  975. symbuf);
  976. res.begin = symbuf->str;
  977. res.len = symbuf->len;
  978. break;
  979. default:
  980. break;
  981. }
  982. }
  983. return res;
  984. }
  985. static rspamd_fstring_t *
  986. rspamd_task_log_write_var (struct rspamd_task *task, rspamd_fstring_t *logbuf,
  987. const rspamd_ftok_t *var, const rspamd_ftok_t *content)
  988. {
  989. rspamd_fstring_t *res = logbuf;
  990. const gchar *p, *c, *end;
  991. if (content == NULL) {
  992. /* Just output variable */
  993. res = rspamd_fstring_append (res, var->begin, var->len);
  994. }
  995. else {
  996. /* Replace $ with variable value */
  997. p = content->begin;
  998. c = p;
  999. end = p + content->len;
  1000. while (p < end) {
  1001. if (*p == '$') {
  1002. if (p > c) {
  1003. res = rspamd_fstring_append (res, c, p - c);
  1004. }
  1005. res = rspamd_fstring_append (res, var->begin, var->len);
  1006. p ++;
  1007. c = p;
  1008. }
  1009. else {
  1010. p ++;
  1011. }
  1012. }
  1013. if (p > c) {
  1014. res = rspamd_fstring_append (res, c, p - c);
  1015. }
  1016. }
  1017. return res;
  1018. }
  1019. static rspamd_fstring_t *
  1020. rspamd_task_write_ialist (struct rspamd_task *task,
  1021. GPtrArray *addrs, gint lim,
  1022. struct rspamd_log_format *lf,
  1023. rspamd_fstring_t *logbuf)
  1024. {
  1025. rspamd_fstring_t *res = logbuf, *varbuf;
  1026. rspamd_ftok_t var = {.begin = NULL, .len = 0};
  1027. struct rspamd_email_address *addr;
  1028. gint i, nchars = 0, wr = 0, cur_chars;
  1029. gboolean has_orig = FALSE;
  1030. if (addrs && lim <= 0) {
  1031. lim = addrs->len;
  1032. }
  1033. PTR_ARRAY_FOREACH (addrs, i, addr) {
  1034. if (addr->flags & RSPAMD_EMAIL_ADDR_ORIGINAL) {
  1035. has_orig = TRUE;
  1036. break;
  1037. }
  1038. }
  1039. varbuf = rspamd_fstring_new ();
  1040. PTR_ARRAY_FOREACH (addrs, i, addr) {
  1041. if (wr >= lim) {
  1042. break;
  1043. }
  1044. if (has_orig) {
  1045. /* Report merely original addresses */
  1046. if (!(addr->flags & RSPAMD_EMAIL_ADDR_ORIGINAL)) {
  1047. continue;
  1048. }
  1049. }
  1050. cur_chars = addr->addr_len;
  1051. varbuf = rspamd_fstring_append (varbuf, addr->addr,
  1052. cur_chars);
  1053. nchars += cur_chars;
  1054. wr ++;
  1055. if (varbuf->len > 0) {
  1056. if (i != lim - 1) {
  1057. varbuf = rspamd_fstring_append (varbuf, ",", 1);
  1058. }
  1059. }
  1060. if (wr >= max_log_elts || nchars >= max_log_elts * 10) {
  1061. varbuf = rspamd_fstring_append (varbuf, "...", 3);
  1062. break;
  1063. }
  1064. }
  1065. if (varbuf->len > 0) {
  1066. var.begin = varbuf->str;
  1067. var.len = varbuf->len;
  1068. res = rspamd_task_log_write_var (task, logbuf,
  1069. &var, (const rspamd_ftok_t *) lf->data);
  1070. }
  1071. rspamd_fstring_free (varbuf);
  1072. return res;
  1073. }
  1074. static rspamd_fstring_t *
  1075. rspamd_task_write_addr_list (struct rspamd_task *task,
  1076. GPtrArray *addrs, gint lim,
  1077. struct rspamd_log_format *lf,
  1078. rspamd_fstring_t *logbuf)
  1079. {
  1080. rspamd_fstring_t *res = logbuf, *varbuf;
  1081. rspamd_ftok_t var = {.begin = NULL, .len = 0};
  1082. struct rspamd_email_address *addr;
  1083. gint i;
  1084. if (lim <= 0) {
  1085. lim = addrs->len;
  1086. }
  1087. varbuf = rspamd_fstring_new ();
  1088. for (i = 0; i < lim; i++) {
  1089. addr = g_ptr_array_index (addrs, i);
  1090. if (addr->addr) {
  1091. varbuf = rspamd_fstring_append (varbuf, addr->addr, addr->addr_len);
  1092. }
  1093. if (varbuf->len > 0) {
  1094. if (i != lim - 1) {
  1095. varbuf = rspamd_fstring_append (varbuf, ",", 1);
  1096. }
  1097. }
  1098. if (i >= max_log_elts) {
  1099. varbuf = rspamd_fstring_append (varbuf, "...", 3);
  1100. break;
  1101. }
  1102. }
  1103. if (varbuf->len > 0) {
  1104. var.begin = varbuf->str;
  1105. var.len = varbuf->len;
  1106. res = rspamd_task_log_write_var (task, logbuf,
  1107. &var, (const rspamd_ftok_t *) lf->data);
  1108. }
  1109. rspamd_fstring_free (varbuf);
  1110. return res;
  1111. }
  1112. static rspamd_fstring_t *
  1113. rspamd_task_log_variable (struct rspamd_task *task,
  1114. struct rspamd_log_format *lf, rspamd_fstring_t *logbuf)
  1115. {
  1116. rspamd_fstring_t *res = logbuf;
  1117. rspamd_ftok_t var = {.begin = NULL, .len = 0};
  1118. static gchar numbuf[128];
  1119. static const gchar undef[] = "undef";
  1120. switch (lf->type) {
  1121. /* String vars */
  1122. case RSPAMD_LOG_MID:
  1123. if (MESSAGE_FIELD_CHECK (task, message_id)) {
  1124. var.begin = MESSAGE_FIELD (task, message_id);
  1125. var.len = strlen (var.begin);
  1126. }
  1127. else {
  1128. var.begin = undef;
  1129. var.len = sizeof (undef) - 1;
  1130. }
  1131. break;
  1132. case RSPAMD_LOG_QID:
  1133. if (task->queue_id) {
  1134. var.begin = task->queue_id;
  1135. var.len = strlen (var.begin);
  1136. }
  1137. else {
  1138. var.begin = undef;
  1139. var.len = sizeof (undef) - 1;
  1140. }
  1141. break;
  1142. case RSPAMD_LOG_USER:
  1143. if (task->user) {
  1144. var.begin = task->user;
  1145. var.len = strlen (var.begin);
  1146. }
  1147. else {
  1148. var.begin = undef;
  1149. var.len = sizeof (undef) - 1;
  1150. }
  1151. break;
  1152. case RSPAMD_LOG_IP:
  1153. if (task->from_addr && rspamd_ip_is_valid (task->from_addr)) {
  1154. var.begin = rspamd_inet_address_to_string (task->from_addr);
  1155. var.len = strlen (var.begin);
  1156. }
  1157. else {
  1158. var.begin = undef;
  1159. var.len = sizeof (undef) - 1;
  1160. }
  1161. break;
  1162. /* Numeric vars */
  1163. case RSPAMD_LOG_LEN:
  1164. var.len = rspamd_snprintf (numbuf, sizeof (numbuf), "%uz",
  1165. task->msg.len);
  1166. var.begin = numbuf;
  1167. break;
  1168. case RSPAMD_LOG_DNS_REQ:
  1169. var.len = rspamd_snprintf (numbuf, sizeof (numbuf), "%uD",
  1170. task->dns_requests);
  1171. var.begin = numbuf;
  1172. break;
  1173. case RSPAMD_LOG_TIME_REAL:
  1174. var.begin = rspamd_log_check_time (task->task_timestamp,
  1175. task->time_real_finish,
  1176. task->cfg->clock_res);
  1177. var.len = strlen (var.begin);
  1178. break;
  1179. case RSPAMD_LOG_TIME_VIRTUAL:
  1180. var.begin = rspamd_log_check_time (task->task_timestamp,
  1181. task->time_real_finish,
  1182. task->cfg->clock_res);
  1183. var.len = strlen (var.begin);
  1184. break;
  1185. /* InternetAddress vars */
  1186. case RSPAMD_LOG_SMTP_FROM:
  1187. if (task->from_envelope) {
  1188. var.begin = task->from_envelope->addr;
  1189. var.len = task->from_envelope->addr_len;
  1190. }
  1191. break;
  1192. case RSPAMD_LOG_MIME_FROM:
  1193. if (MESSAGE_FIELD_CHECK (task, from_mime)) {
  1194. return rspamd_task_write_ialist (task,
  1195. MESSAGE_FIELD (task, from_mime),
  1196. 1,
  1197. lf,
  1198. logbuf);
  1199. }
  1200. break;
  1201. case RSPAMD_LOG_SMTP_RCPT:
  1202. if (task->rcpt_envelope) {
  1203. return rspamd_task_write_addr_list (task, task->rcpt_envelope, 1, lf,
  1204. logbuf);
  1205. }
  1206. break;
  1207. case RSPAMD_LOG_MIME_RCPT:
  1208. if (MESSAGE_FIELD_CHECK (task, rcpt_mime)) {
  1209. return rspamd_task_write_ialist (task,
  1210. MESSAGE_FIELD (task, rcpt_mime),
  1211. 1,
  1212. lf,
  1213. logbuf);
  1214. }
  1215. break;
  1216. case RSPAMD_LOG_SMTP_RCPTS:
  1217. if (task->rcpt_envelope) {
  1218. return rspamd_task_write_addr_list (task, task->rcpt_envelope, -1, lf,
  1219. logbuf);
  1220. }
  1221. break;
  1222. case RSPAMD_LOG_MIME_RCPTS:
  1223. if (MESSAGE_FIELD_CHECK (task, rcpt_mime)) {
  1224. return rspamd_task_write_ialist (task,
  1225. MESSAGE_FIELD (task, rcpt_mime),
  1226. -1, /* All addresses */
  1227. lf,
  1228. logbuf);
  1229. }
  1230. break;
  1231. case RSPAMD_LOG_DIGEST:
  1232. if (task->message) {
  1233. var.len = rspamd_snprintf (numbuf, sizeof (numbuf), "%*xs",
  1234. (gint) sizeof (MESSAGE_FIELD (task, digest)),
  1235. MESSAGE_FIELD (task, digest));
  1236. var.begin = numbuf;
  1237. }
  1238. else {
  1239. var.begin = undef;
  1240. var.len = sizeof (undef) - 1;
  1241. }
  1242. break;
  1243. case RSPAMD_LOG_FILENAME:
  1244. if (task->msg.fpath) {
  1245. var.len = strlen (task->msg.fpath);
  1246. var.begin = task->msg.fpath;
  1247. }
  1248. else {
  1249. var.begin = undef;
  1250. var.len = sizeof (undef) - 1;
  1251. }
  1252. break;
  1253. case RSPAMD_LOG_FORCED_ACTION:
  1254. if (task->result->passthrough_result) {
  1255. struct rspamd_passthrough_result *pr = task->result->passthrough_result;
  1256. if (!isnan (pr->target_score)) {
  1257. var.len = rspamd_snprintf (numbuf, sizeof (numbuf),
  1258. "%s \"%s\"; score=%.2f (set by %s)",
  1259. pr->action->name,
  1260. pr->message,
  1261. pr->target_score,
  1262. pr->module);
  1263. }
  1264. else {
  1265. var.len = rspamd_snprintf (numbuf, sizeof (numbuf),
  1266. "%s \"%s\"; score=nan (set by %s)",
  1267. pr->action->name,
  1268. pr->message,
  1269. pr->module);
  1270. }
  1271. var.begin = numbuf;
  1272. }
  1273. else {
  1274. var.begin = undef;
  1275. var.len = sizeof (undef) - 1;
  1276. }
  1277. break;
  1278. case RSPAMD_LOG_SETTINGS_ID:
  1279. if (task->settings_elt) {
  1280. var.begin = task->settings_elt->name;
  1281. var.len = strlen (task->settings_elt->name);
  1282. }
  1283. else {
  1284. var.begin = undef;
  1285. var.len = sizeof (undef) - 1;
  1286. }
  1287. break;
  1288. default:
  1289. var = rspamd_task_log_metric_res (task, lf);
  1290. break;
  1291. }
  1292. if (var.len > 0) {
  1293. res = rspamd_task_log_write_var (task, logbuf,
  1294. &var, (const rspamd_ftok_t *)lf->data);
  1295. }
  1296. return res;
  1297. }
  1298. void
  1299. rspamd_task_write_log (struct rspamd_task *task)
  1300. {
  1301. rspamd_fstring_t *logbuf;
  1302. struct rspamd_log_format *lf;
  1303. struct rspamd_task **ptask;
  1304. const gchar *lua_str;
  1305. gsize lua_str_len;
  1306. lua_State *L;
  1307. g_assert (task != NULL);
  1308. if (task->cfg->log_format == NULL ||
  1309. (task->flags & RSPAMD_TASK_FLAG_NO_LOG)) {
  1310. msg_debug_task ("skip logging due to no log flag");
  1311. return;
  1312. }
  1313. logbuf = rspamd_fstring_sized_new (1000);
  1314. DL_FOREACH (task->cfg->log_format, lf) {
  1315. switch (lf->type) {
  1316. case RSPAMD_LOG_STRING:
  1317. logbuf = rspamd_fstring_append (logbuf, lf->data, lf->len);
  1318. break;
  1319. case RSPAMD_LOG_LUA:
  1320. L = task->cfg->lua_state;
  1321. lua_rawgeti (L, LUA_REGISTRYINDEX, GPOINTER_TO_INT (lf->data));
  1322. ptask = lua_newuserdata (L, sizeof (*ptask));
  1323. rspamd_lua_setclass (L, "rspamd{task}", -1);
  1324. *ptask = task;
  1325. if (lua_pcall (L, 1, 1, 0) != 0) {
  1326. msg_err_task ("call to log function failed: %s",
  1327. lua_tostring (L, -1));
  1328. lua_pop (L, 1);
  1329. }
  1330. else {
  1331. lua_str = lua_tolstring (L, -1, &lua_str_len);
  1332. if (lua_str != NULL) {
  1333. logbuf = rspamd_fstring_append (logbuf, lua_str, lua_str_len);
  1334. }
  1335. lua_pop (L, 1);
  1336. }
  1337. break;
  1338. default:
  1339. /* We have a variable in log format */
  1340. if (lf->flags & RSPAMD_LOG_FMT_FLAG_CONDITION) {
  1341. if (!rspamd_task_log_check_condition (task, lf)) {
  1342. continue;
  1343. }
  1344. }
  1345. logbuf = rspamd_task_log_variable (task, lf, logbuf);
  1346. break;
  1347. }
  1348. }
  1349. msg_notice_task ("%V", logbuf);
  1350. rspamd_fstring_free (logbuf);
  1351. }
  1352. gdouble
  1353. rspamd_task_get_required_score (struct rspamd_task *task, struct rspamd_scan_result *m)
  1354. {
  1355. gint i;
  1356. if (m == NULL) {
  1357. m = task->result;
  1358. if (m == NULL) {
  1359. return NAN;
  1360. }
  1361. }
  1362. for (i = m->nactions - 1; i >= 0; i --) {
  1363. struct rspamd_action_result *action_lim = &m->actions_limits[i];
  1364. if (!isnan (action_lim->cur_limit) &&
  1365. !(action_lim->action->flags & (RSPAMD_ACTION_NO_THRESHOLD|RSPAMD_ACTION_HAM))) {
  1366. return m->actions_limits[i].cur_limit;
  1367. }
  1368. }
  1369. return NAN;
  1370. }
  1371. rspamd_ftok_t *
  1372. rspamd_task_get_request_header (struct rspamd_task *task,
  1373. const gchar *name)
  1374. {
  1375. struct rspamd_request_header_chain *ret =
  1376. rspamd_task_get_request_header_multiple (task, name);
  1377. if (ret) {
  1378. return ret->hdr;
  1379. }
  1380. return NULL;
  1381. }
  1382. struct rspamd_request_header_chain *
  1383. rspamd_task_get_request_header_multiple (struct rspamd_task *task,
  1384. const gchar *name)
  1385. {
  1386. struct rspamd_request_header_chain *ret = NULL;
  1387. rspamd_ftok_t srch;
  1388. khiter_t k;
  1389. srch.begin = (gchar *)name;
  1390. srch.len = strlen (name);
  1391. k = kh_get (rspamd_req_headers_hash, task->request_headers,
  1392. &srch);
  1393. if (k != kh_end (task->request_headers)) {
  1394. ret = kh_value (task->request_headers, k);
  1395. }
  1396. return ret;
  1397. }
  1398. void
  1399. rspamd_task_add_request_header (struct rspamd_task *task,
  1400. rspamd_ftok_t *name, rspamd_ftok_t *value)
  1401. {
  1402. khiter_t k;
  1403. gint res;
  1404. struct rspamd_request_header_chain *chain, *nchain;
  1405. k = kh_put (rspamd_req_headers_hash, task->request_headers,
  1406. name, &res);
  1407. if (res == 0) {
  1408. /* Existing name */
  1409. nchain = rspamd_mempool_alloc (task->task_pool, sizeof (*nchain));
  1410. nchain->hdr = value;
  1411. nchain->next = NULL;
  1412. chain = kh_value (task->request_headers, k);
  1413. /* Slow but OK here */
  1414. LL_APPEND (chain, nchain);
  1415. }
  1416. else {
  1417. nchain = rspamd_mempool_alloc (task->task_pool, sizeof (*nchain));
  1418. nchain->hdr = value;
  1419. nchain->next = NULL;
  1420. kh_value (task->request_headers, k) = nchain;
  1421. }
  1422. }
  1423. void
  1424. rspamd_task_profile_set (struct rspamd_task *task, const gchar *key,
  1425. gdouble value)
  1426. {
  1427. GHashTable *tbl;
  1428. gdouble *pval;
  1429. if (key == NULL) {
  1430. return;
  1431. }
  1432. tbl = rspamd_mempool_get_variable (task->task_pool, RSPAMD_MEMPOOL_PROFILE);
  1433. if (tbl == NULL) {
  1434. tbl = g_hash_table_new (rspamd_str_hash, rspamd_str_equal);
  1435. rspamd_mempool_set_variable (task->task_pool, RSPAMD_MEMPOOL_PROFILE,
  1436. tbl, (rspamd_mempool_destruct_t)g_hash_table_unref);
  1437. }
  1438. pval = g_hash_table_lookup (tbl, key);
  1439. if (pval == NULL) {
  1440. pval = rspamd_mempool_alloc (task->task_pool, sizeof (*pval));
  1441. *pval = value;
  1442. g_hash_table_insert (tbl, (void *)key, pval);
  1443. }
  1444. else {
  1445. *pval = value;
  1446. }
  1447. }
  1448. gdouble*
  1449. rspamd_task_profile_get (struct rspamd_task *task, const gchar *key)
  1450. {
  1451. GHashTable *tbl;
  1452. gdouble *pval = NULL;
  1453. tbl = rspamd_mempool_get_variable (task->task_pool, RSPAMD_MEMPOOL_PROFILE);
  1454. if (tbl != NULL) {
  1455. pval = g_hash_table_lookup (tbl, key);
  1456. }
  1457. return pval;
  1458. }
  1459. gboolean
  1460. rspamd_task_set_finish_time (struct rspamd_task *task)
  1461. {
  1462. if (isnan (task->time_real_finish)) {
  1463. task->time_real_finish = ev_time ();
  1464. return TRUE;
  1465. }
  1466. return FALSE;
  1467. }
  1468. const gchar *
  1469. rspamd_task_stage_name (enum rspamd_task_stage stg)
  1470. {
  1471. const gchar *ret = "unknown stage";
  1472. switch (stg) {
  1473. case RSPAMD_TASK_STAGE_CONNECT:
  1474. ret = "connect";
  1475. break;
  1476. case RSPAMD_TASK_STAGE_ENVELOPE:
  1477. ret = "envelope";
  1478. break;
  1479. case RSPAMD_TASK_STAGE_READ_MESSAGE:
  1480. ret = "read_message";
  1481. break;
  1482. case RSPAMD_TASK_STAGE_PRE_FILTERS:
  1483. ret = "prefilters";
  1484. break;
  1485. case RSPAMD_TASK_STAGE_PROCESS_MESSAGE:
  1486. ret = "process_message";
  1487. break;
  1488. case RSPAMD_TASK_STAGE_FILTERS:
  1489. ret = "filters";
  1490. break;
  1491. case RSPAMD_TASK_STAGE_CLASSIFIERS_PRE:
  1492. ret = "classifiers_pre";
  1493. break;
  1494. case RSPAMD_TASK_STAGE_CLASSIFIERS:
  1495. ret = "classifiers";
  1496. break;
  1497. case RSPAMD_TASK_STAGE_CLASSIFIERS_POST:
  1498. ret = "classifiers_post";
  1499. break;
  1500. case RSPAMD_TASK_STAGE_COMPOSITES:
  1501. ret = "composites";
  1502. break;
  1503. case RSPAMD_TASK_STAGE_POST_FILTERS:
  1504. ret = "postfilters";
  1505. break;
  1506. case RSPAMD_TASK_STAGE_LEARN_PRE:
  1507. ret = "learn_pre";
  1508. break;
  1509. case RSPAMD_TASK_STAGE_LEARN:
  1510. ret = "learn";
  1511. break;
  1512. case RSPAMD_TASK_STAGE_LEARN_POST:
  1513. ret = "learn_post";
  1514. break;
  1515. case RSPAMD_TASK_STAGE_COMPOSITES_POST:
  1516. ret = "composites_post";
  1517. break;
  1518. case RSPAMD_TASK_STAGE_IDEMPOTENT:
  1519. ret = "idempotent";
  1520. break;
  1521. case RSPAMD_TASK_STAGE_DONE:
  1522. ret = "done";
  1523. break;
  1524. case RSPAMD_TASK_STAGE_REPLIED:
  1525. ret = "replied";
  1526. break;
  1527. default:
  1528. break;
  1529. }
  1530. return ret;
  1531. }