You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

roll_history.c 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. /*-
  2. * Copyright 2016 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "rspamd.h"
  18. #include "libmime/message.h"
  19. #include "lua/lua_common.h"
  20. #include "unix-std.h"
  21. #include "cfg_file_private.h"
  22. static const char rspamd_history_magic_old[] = {'r', 's', 'h', '1'};
  23. /**
  24. * Returns new roll history
  25. * @param pool pool for shared memory
  26. * @return new structure
  27. */
  28. struct roll_history *
  29. rspamd_roll_history_new(rspamd_mempool_t *pool, unsigned int max_rows,
  30. struct rspamd_config *cfg)
  31. {
  32. struct roll_history *history;
  33. lua_State *L = cfg->lua_state;
  34. if (pool == NULL || max_rows == 0) {
  35. return NULL;
  36. }
  37. history = rspamd_mempool_alloc0_shared(pool, sizeof(struct roll_history));
  38. /*
  39. * Here, we check if there is any plugin that handles history,
  40. * in this case, we disable this code completely
  41. */
  42. lua_getglobal(L, "rspamd_plugins");
  43. if (lua_istable(L, -1)) {
  44. lua_pushstring(L, "history");
  45. lua_gettable(L, -2);
  46. if (lua_istable(L, -1)) {
  47. history->disabled = TRUE;
  48. }
  49. lua_pop(L, 1);
  50. }
  51. lua_pop(L, 1);
  52. if (!history->disabled) {
  53. history->rows = rspamd_mempool_alloc0_shared(pool,
  54. sizeof(struct roll_history_row) * max_rows);
  55. history->nrows = max_rows;
  56. }
  57. return history;
  58. }
  59. struct history_metric_callback_data {
  60. char *pos;
  61. int remain;
  62. };
  63. static void
  64. roll_history_symbols_callback(gpointer key, gpointer value, void *user_data)
  65. {
  66. struct history_metric_callback_data *cb = user_data;
  67. struct rspamd_symbol_result *s = value;
  68. unsigned int wr;
  69. if (s->flags & RSPAMD_SYMBOL_RESULT_IGNORED) {
  70. return;
  71. }
  72. if (cb->remain > 0) {
  73. wr = rspamd_snprintf(cb->pos, cb->remain, "%s, ", s->name);
  74. cb->pos += wr;
  75. cb->remain -= wr;
  76. }
  77. }
  78. /**
  79. * Update roll history with data from task
  80. * @param history roll history object
  81. * @param task task object
  82. */
  83. void rspamd_roll_history_update(struct roll_history *history,
  84. struct rspamd_task *task)
  85. {
  86. unsigned int row_num;
  87. struct roll_history_row *row;
  88. struct rspamd_scan_result *metric_res;
  89. struct history_metric_callback_data cbdata;
  90. struct rspamd_action *action;
  91. if (history->disabled) {
  92. return;
  93. }
  94. /* First of all obtain check and obtain row number */
  95. g_atomic_int_compare_and_exchange(&history->cur_row, history->nrows, 0);
  96. #if ((GLIB_MAJOR_VERSION == 2) && (GLIB_MINOR_VERSION > 30))
  97. row_num = g_atomic_int_add(&history->cur_row, 1);
  98. #else
  99. row_num = g_atomic_int_exchange_and_add(&history->cur_row, 1);
  100. #endif
  101. if (row_num < history->nrows) {
  102. row = &history->rows[row_num];
  103. g_atomic_int_set(&row->completed, FALSE);
  104. }
  105. else {
  106. /* Race condition */
  107. history->cur_row = 0;
  108. return;
  109. }
  110. /* Add information from task to roll history */
  111. if (task->from_addr) {
  112. rspamd_strlcpy(row->from_addr,
  113. rspamd_inet_address_to_string(task->from_addr),
  114. sizeof(row->from_addr));
  115. }
  116. else {
  117. rspamd_strlcpy(row->from_addr, "unknown", sizeof(row->from_addr));
  118. }
  119. row->timestamp = task->task_timestamp;
  120. /* Strings */
  121. if (task->message) {
  122. rspamd_strlcpy(row->message_id, MESSAGE_FIELD(task, message_id),
  123. sizeof(row->message_id));
  124. }
  125. if (task->auth_user) {
  126. rspamd_strlcpy(row->user, task->auth_user, sizeof(row->user));
  127. }
  128. else {
  129. row->user[0] = '\0';
  130. }
  131. /* Get default metric */
  132. metric_res = task->result;
  133. if (metric_res == NULL) {
  134. row->symbols[0] = '\0';
  135. row->action = METRIC_ACTION_NOACTION;
  136. }
  137. else {
  138. row->score = metric_res->score;
  139. action = rspamd_check_action_metric(task, NULL, NULL);
  140. row->action = action->action_type;
  141. row->required_score = rspamd_task_get_required_score(task, metric_res);
  142. cbdata.pos = row->symbols;
  143. cbdata.remain = sizeof(row->symbols);
  144. rspamd_task_symbol_result_foreach(task, NULL,
  145. roll_history_symbols_callback,
  146. &cbdata);
  147. if (cbdata.remain > 0) {
  148. /* Remove last whitespace and comma */
  149. *cbdata.pos-- = '\0';
  150. *cbdata.pos-- = '\0';
  151. *cbdata.pos = '\0';
  152. }
  153. }
  154. row->scan_time = task->time_real_finish - task->task_timestamp;
  155. row->len = task->msg.len;
  156. g_atomic_int_set(&row->completed, TRUE);
  157. }
  158. /**
  159. * Load previously saved history from file
  160. * @param history roll history object
  161. * @param filename filename to load from
  162. * @return TRUE if history has been loaded
  163. */
  164. gboolean
  165. rspamd_roll_history_load(struct roll_history *history, const char *filename)
  166. {
  167. int fd;
  168. struct stat st;
  169. char magic[sizeof(rspamd_history_magic_old)];
  170. ucl_object_t *top;
  171. const ucl_object_t *cur, *elt;
  172. struct ucl_parser *parser;
  173. struct roll_history_row *row;
  174. unsigned int n, i;
  175. g_assert(history != NULL);
  176. if (history->disabled) {
  177. return TRUE;
  178. }
  179. if (stat(filename, &st) == -1) {
  180. msg_info("cannot load history from %s: %s", filename,
  181. strerror(errno));
  182. return FALSE;
  183. }
  184. if ((fd = open(filename, O_RDONLY)) == -1) {
  185. msg_info("cannot load history from %s: %s", filename,
  186. strerror(errno));
  187. return FALSE;
  188. }
  189. /* Check for old format */
  190. if (read(fd, magic, sizeof(magic)) == -1) {
  191. close(fd);
  192. msg_info("cannot read history from %s: %s", filename,
  193. strerror(errno));
  194. return FALSE;
  195. }
  196. if (memcmp(magic, rspamd_history_magic_old, sizeof(magic)) == 0) {
  197. close(fd);
  198. msg_warn("cannot read history from old format %s, "
  199. "it will be replaced after restart",
  200. filename);
  201. return FALSE;
  202. }
  203. parser = ucl_parser_new(0);
  204. if (!ucl_parser_add_fd(parser, fd)) {
  205. msg_warn("cannot parse history file %s: %s", filename,
  206. ucl_parser_get_error(parser));
  207. ucl_parser_free(parser);
  208. close(fd);
  209. return FALSE;
  210. }
  211. top = ucl_parser_get_object(parser);
  212. ucl_parser_free(parser);
  213. close(fd);
  214. if (top == NULL) {
  215. msg_warn("cannot parse history file %s: no object", filename);
  216. return FALSE;
  217. }
  218. if (ucl_object_type(top) != UCL_ARRAY) {
  219. msg_warn("invalid object type read from: %s", filename);
  220. ucl_object_unref(top);
  221. return FALSE;
  222. }
  223. if (top->len > history->nrows) {
  224. msg_warn("stored history is larger than the current one: %ud (file) vs "
  225. "%ud (history)",
  226. top->len, history->nrows);
  227. n = history->nrows;
  228. }
  229. else if (top->len < history->nrows) {
  230. msg_warn(
  231. "stored history is smaller than the current one: %ud (file) vs "
  232. "%ud (history)",
  233. top->len, history->nrows);
  234. n = top->len;
  235. }
  236. else {
  237. n = top->len;
  238. }
  239. for (i = 0; i < n; i++) {
  240. cur = ucl_array_find_index(top, i);
  241. if (cur != NULL && ucl_object_type(cur) == UCL_OBJECT) {
  242. row = &history->rows[i];
  243. memset(row, 0, sizeof(*row));
  244. elt = ucl_object_lookup(cur, "time");
  245. if (elt && ucl_object_type(elt) == UCL_FLOAT) {
  246. row->timestamp = ucl_object_todouble(elt);
  247. }
  248. elt = ucl_object_lookup(cur, "id");
  249. if (elt && ucl_object_type(elt) == UCL_STRING) {
  250. rspamd_strlcpy(row->message_id, ucl_object_tostring(elt),
  251. sizeof(row->message_id));
  252. }
  253. elt = ucl_object_lookup(cur, "symbols");
  254. if (elt && ucl_object_type(elt) == UCL_STRING) {
  255. rspamd_strlcpy(row->symbols, ucl_object_tostring(elt),
  256. sizeof(row->symbols));
  257. }
  258. elt = ucl_object_lookup(cur, "user");
  259. if (elt && ucl_object_type(elt) == UCL_STRING) {
  260. rspamd_strlcpy(row->user, ucl_object_tostring(elt),
  261. sizeof(row->user));
  262. }
  263. elt = ucl_object_lookup(cur, "from");
  264. if (elt && ucl_object_type(elt) == UCL_STRING) {
  265. rspamd_strlcpy(row->from_addr, ucl_object_tostring(elt),
  266. sizeof(row->from_addr));
  267. }
  268. elt = ucl_object_lookup(cur, "len");
  269. if (elt && ucl_object_type(elt) == UCL_INT) {
  270. row->len = ucl_object_toint(elt);
  271. }
  272. elt = ucl_object_lookup(cur, "scan_time");
  273. if (elt && ucl_object_type(elt) == UCL_FLOAT) {
  274. row->scan_time = ucl_object_todouble(elt);
  275. }
  276. elt = ucl_object_lookup(cur, "score");
  277. if (elt && ucl_object_type(elt) == UCL_FLOAT) {
  278. row->score = ucl_object_todouble(elt);
  279. }
  280. elt = ucl_object_lookup(cur, "required_score");
  281. if (elt && ucl_object_type(elt) == UCL_FLOAT) {
  282. row->required_score = ucl_object_todouble(elt);
  283. }
  284. elt = ucl_object_lookup(cur, "action");
  285. if (elt && ucl_object_type(elt) == UCL_INT) {
  286. row->action = ucl_object_toint(elt);
  287. }
  288. row->completed = TRUE;
  289. }
  290. }
  291. ucl_object_unref(top);
  292. history->cur_row = n;
  293. return TRUE;
  294. }
  295. /**
  296. * Save history to file
  297. * @param history roll history object
  298. * @param filename filename to load from
  299. * @return TRUE if history has been saved
  300. */
  301. gboolean
  302. rspamd_roll_history_save(struct roll_history *history, const char *filename)
  303. {
  304. int fd;
  305. FILE *fp;
  306. ucl_object_t *obj, *elt;
  307. unsigned int i;
  308. struct roll_history_row *row;
  309. struct ucl_emitter_functions *emitter_func;
  310. g_assert(history != NULL);
  311. if (history->disabled) {
  312. return TRUE;
  313. }
  314. if ((fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 00600)) == -1) {
  315. msg_info("cannot save history to %s: %s", filename, strerror(errno));
  316. return FALSE;
  317. }
  318. fp = fdopen(fd, "w");
  319. obj = ucl_object_typed_new(UCL_ARRAY);
  320. for (i = 0; i < history->nrows; i++) {
  321. row = &history->rows[i];
  322. if (!row->completed) {
  323. continue;
  324. }
  325. elt = ucl_object_typed_new(UCL_OBJECT);
  326. ucl_object_insert_key(elt, ucl_object_fromdouble(row->timestamp),
  327. "time", 0, false);
  328. ucl_object_insert_key(elt, ucl_object_fromstring(row->message_id),
  329. "id", 0, false);
  330. ucl_object_insert_key(elt, ucl_object_fromstring(row->symbols),
  331. "symbols", 0, false);
  332. ucl_object_insert_key(elt, ucl_object_fromstring(row->user),
  333. "user", 0, false);
  334. ucl_object_insert_key(elt, ucl_object_fromstring(row->from_addr),
  335. "from", 0, false);
  336. ucl_object_insert_key(elt, ucl_object_fromint(row->len),
  337. "len", 0, false);
  338. ucl_object_insert_key(elt, ucl_object_fromdouble(row->scan_time),
  339. "scan_time", 0, false);
  340. ucl_object_insert_key(elt, ucl_object_fromdouble(row->score),
  341. "score", 0, false);
  342. ucl_object_insert_key(elt, ucl_object_fromdouble(row->required_score),
  343. "required_score", 0, false);
  344. ucl_object_insert_key(elt, ucl_object_fromint(row->action),
  345. "action", 0, false);
  346. ucl_array_append(obj, elt);
  347. }
  348. emitter_func = ucl_object_emit_file_funcs(fp);
  349. ucl_object_emit_full(obj, UCL_EMIT_JSON_COMPACT, emitter_func, NULL);
  350. ucl_object_emit_funcs_free(emitter_func);
  351. ucl_object_unref(obj);
  352. fclose(fp);
  353. return TRUE;
  354. }