You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago

  1. /*
  2. * Copyright 2024 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "rrd.h"
  18. #include "util.h"
  19. #include "cfg_file.h"
  20. #include "logger.h"
  21. #include "unix-std.h"
  22. #include "cryptobox.h"
  23. #include <math.h>
  24. #define RSPAMD_RRD_DS_COUNT METRIC_ACTION_MAX
  25. #define RSPAMD_RRD_OLD_DS_COUNT 4
  26. #define RSPAMD_RRD_RRA_COUNT 4
  27. #define msg_err_rrd(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
  28. "rrd", file->id, \
  29. G_STRFUNC, \
  30. __VA_ARGS__)
  31. #define msg_warn_rrd(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \
  32. "rrd", file->id, \
  33. G_STRFUNC, \
  34. __VA_ARGS__)
  35. #define msg_info_rrd(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \
  36. "rrd", file->id, \
  37. G_STRFUNC, \
  38. __VA_ARGS__)
  39. #define msg_debug_rrd(...) rspamd_conditional_debug_fast(NULL, NULL, \
  40. rspamd_rrd_log_id, "rrd", file->id, \
  41. G_STRFUNC, \
  42. __VA_ARGS__)
  43. INIT_LOG_MODULE(rrd)
  44. static GQuark
  45. rrd_error_quark(void)
  46. {
  47. return g_quark_from_static_string("rrd-error");
  48. }
  49. /**
  50. * Convert rrd dst type from string to numeric value
  51. */
  52. enum rrd_dst_type
  53. rrd_dst_from_string(const char *str)
  54. {
  55. if (g_ascii_strcasecmp(str, "counter") == 0) {
  56. return RRD_DST_COUNTER;
  57. }
  58. else if (g_ascii_strcasecmp(str, "absolute") == 0) {
  59. return RRD_DST_ABSOLUTE;
  60. }
  61. else if (g_ascii_strcasecmp(str, "gauge") == 0) {
  62. return RRD_DST_GAUGE;
  63. }
  64. else if (g_ascii_strcasecmp(str, "cdef") == 0) {
  65. return RRD_DST_CDEF;
  66. }
  67. else if (g_ascii_strcasecmp(str, "derive") == 0) {
  68. return RRD_DST_DERIVE;
  69. }
  70. return RRD_DST_INVALID;
  71. }
  72. /**
  73. * Convert numeric presentation of dst to string
  74. */
  75. const char *
  76. rrd_dst_to_string(enum rrd_dst_type type)
  77. {
  78. switch (type) {
  79. case RRD_DST_COUNTER:
  80. return "COUNTER";
  81. case RRD_DST_ABSOLUTE:
  82. return "ABSOLUTE";
  83. case RRD_DST_GAUGE:
  84. return "GAUGE";
  85. case RRD_DST_CDEF:
  86. return "CDEF";
  87. case RRD_DST_DERIVE:
  88. return "DERIVE";
  89. default:
  90. return "U";
  91. }
  92. return "U";
  93. }
  94. /**
  95. * Convert rrd consolidation function type from string to numeric value
  96. */
  97. enum rrd_cf_type
  98. rrd_cf_from_string(const char *str)
  99. {
  100. if (g_ascii_strcasecmp(str, "average") == 0) {
  101. return RRD_CF_AVERAGE;
  102. }
  103. else if (g_ascii_strcasecmp(str, "minimum") == 0) {
  104. return RRD_CF_MINIMUM;
  105. }
  106. else if (g_ascii_strcasecmp(str, "maximum") == 0) {
  107. return RRD_CF_MAXIMUM;
  108. }
  109. else if (g_ascii_strcasecmp(str, "last") == 0) {
  110. return RRD_CF_LAST;
  111. }
  112. /* XXX: add other CF functions supported by rrd */
  113. return RRD_CF_INVALID;
  114. }
  115. /**
  116. * Convert numeric presentation of cf to string
  117. */
  118. const char *
  119. rrd_cf_to_string(enum rrd_cf_type type)
  120. {
  121. switch (type) {
  122. case RRD_CF_AVERAGE:
  123. return "AVERAGE";
  124. case RRD_CF_MINIMUM:
  125. return "MINIMUM";
  126. case RRD_CF_MAXIMUM:
  127. return "MAXIMUM";
  128. case RRD_CF_LAST:
  129. return "LAST";
  130. default:
  131. return "U";
  132. }
  133. /* XXX: add other CF functions supported by rrd */
  134. return "U";
  135. }
  136. void rrd_make_default_rra(const char *cf_name,
  137. gulong pdp_cnt,
  138. gulong rows,
  139. struct rrd_rra_def *rra)
  140. {
  141. g_assert(cf_name != NULL);
  142. g_assert(rrd_cf_from_string(cf_name) != RRD_CF_INVALID);
  143. rra->pdp_cnt = pdp_cnt;
  144. rra->row_cnt = rows;
  145. rspamd_strlcpy(rra->cf_nam, cf_name, sizeof(rra->cf_nam));
  146. memset(rra->par, 0, sizeof(rra->par));
  147. rra->par[RRA_cdp_xff_val].dv = 0.5;
  148. }
  149. void rrd_make_default_ds(const char *name,
  150. const char *type,
  151. gulong pdp_step,
  152. struct rrd_ds_def *ds)
  153. {
  154. g_assert(name != NULL);
  155. g_assert(type != NULL);
  156. g_assert(rrd_dst_from_string(type) != RRD_DST_INVALID);
  157. rspamd_strlcpy(ds->ds_nam, name, sizeof(ds->ds_nam));
  158. rspamd_strlcpy(ds->dst, type, sizeof(ds->dst));
  159. memset(ds->par, 0, sizeof(ds->par));
  160. ds->par[RRD_DS_mrhb_cnt].lv = pdp_step * 2;
  161. ds->par[RRD_DS_min_val].dv = NAN;
  162. ds->par[RRD_DS_max_val].dv = NAN;
  163. }
  164. /**
  165. * Check rrd file for correctness (size, cookies, etc)
  166. */
  167. static gboolean
  168. rspamd_rrd_check_file(const char *filename, gboolean need_data, GError **err)
  169. {
  170. int fd, i;
  171. struct stat st;
  172. struct rrd_file_head head;
  173. struct rrd_rra_def rra;
  174. int head_size;
  175. fd = open(filename, O_RDWR);
  176. if (fd == -1) {
  177. g_set_error(err,
  178. rrd_error_quark(), errno, "rrd open error: %s", strerror(errno));
  179. return FALSE;
  180. }
  181. if (fstat(fd, &st) == -1) {
  182. g_set_error(err,
  183. rrd_error_quark(), errno, "rrd stat error: %s", strerror(errno));
  184. close(fd);
  185. return FALSE;
  186. }
  187. if (st.st_size < (goffset) sizeof(struct rrd_file_head)) {
  188. /* We have trimmed file */
  189. g_set_error(err, rrd_error_quark(), EINVAL, "rrd size is bad: %ud",
  190. (unsigned int) st.st_size);
  191. close(fd);
  192. return FALSE;
  193. }
  194. /* Try to read header */
  195. if (read(fd, &head, sizeof(head)) != sizeof(head)) {
  196. g_set_error(err,
  197. rrd_error_quark(), errno, "rrd read head error: %s",
  198. strerror(errno));
  199. close(fd);
  200. return FALSE;
  201. }
  202. /* Check magic */
  203. if (memcmp(head.version, RRD_VERSION, sizeof(head.version)) != 0) {
  204. g_set_error(err,
  205. rrd_error_quark(), EINVAL, "rrd head error: bad cookie");
  206. close(fd);
  207. return FALSE;
  208. }
  209. if (head.float_cookie != RRD_FLOAT_COOKIE) {
  210. g_set_error(err,
  211. rrd_error_quark(), EINVAL, "rrd head error: another architecture "
  212. "(file cookie %g != our cookie %g)",
  213. head.float_cookie, RRD_FLOAT_COOKIE);
  214. close(fd);
  215. return FALSE;
  216. }
  217. /* Check for other params */
  218. if (head.ds_cnt <= 0 || head.rra_cnt <= 0) {
  219. g_set_error(err,
  220. rrd_error_quark(), EINVAL, "rrd head cookies error: bad rra or ds count");
  221. close(fd);
  222. return FALSE;
  223. }
  224. /* Now we can calculate the overall size of rrd */
  225. head_size = sizeof(struct rrd_file_head) +
  226. sizeof(struct rrd_ds_def) * head.ds_cnt +
  227. sizeof(struct rrd_rra_def) * head.rra_cnt +
  228. sizeof(struct rrd_live_head) +
  229. sizeof(struct rrd_pdp_prep) * head.ds_cnt +
  230. sizeof(struct rrd_cdp_prep) * head.ds_cnt * head.rra_cnt +
  231. sizeof(struct rrd_rra_ptr) * head.rra_cnt;
  232. if (st.st_size < (goffset) head_size) {
  233. g_set_error(err,
  234. rrd_error_quark(), errno, "rrd file seems to have stripped header: %d",
  235. head_size);
  236. close(fd);
  237. return FALSE;
  238. }
  239. if (need_data) {
  240. /* Now check rra */
  241. if (lseek(fd, sizeof(struct rrd_ds_def) * head.ds_cnt,
  242. SEEK_CUR) == -1) {
  243. g_set_error(err,
  244. rrd_error_quark(), errno, "rrd head lseek error: %s",
  245. strerror(errno));
  246. close(fd);
  247. return FALSE;
  248. }
  249. for (i = 0; i < (int) head.rra_cnt; i++) {
  250. if (read(fd, &rra, sizeof(rra)) != sizeof(rra)) {
  251. g_set_error(err,
  252. rrd_error_quark(), errno, "rrd read rra error: %s",
  253. strerror(errno));
  254. close(fd);
  255. return FALSE;
  256. }
  257. head_size += rra.row_cnt * head.ds_cnt * sizeof(double);
  258. }
  259. if (st.st_size != head_size) {
  260. g_set_error(err,
  261. rrd_error_quark(), EINVAL, "rrd file seems to have incorrect size: %d, must be %d",
  262. (int) st.st_size, head_size);
  263. close(fd);
  264. return FALSE;
  265. }
  266. }
  267. close(fd);
  268. return TRUE;
  269. }
  270. /**
  271. * Adjust pointers in mmapped rrd file
  272. * @param file
  273. */
  274. static void
  275. rspamd_rrd_adjust_pointers(struct rspamd_rrd_file *file, gboolean completed)
  276. {
  277. uint8_t *ptr;
  278. ptr = file->map;
  279. file->stat_head = (struct rrd_file_head *) ptr;
  280. ptr += sizeof(struct rrd_file_head);
  281. file->ds_def = (struct rrd_ds_def *) ptr;
  282. ptr += sizeof(struct rrd_ds_def) * file->stat_head->ds_cnt;
  283. file->rra_def = (struct rrd_rra_def *) ptr;
  284. ptr += sizeof(struct rrd_rra_def) * file->stat_head->rra_cnt;
  285. file->live_head = (struct rrd_live_head *) ptr;
  286. ptr += sizeof(struct rrd_live_head);
  287. file->pdp_prep = (struct rrd_pdp_prep *) ptr;
  288. ptr += sizeof(struct rrd_pdp_prep) * file->stat_head->ds_cnt;
  289. file->cdp_prep = (struct rrd_cdp_prep *) ptr;
  290. ptr += sizeof(struct rrd_cdp_prep) * file->stat_head->rra_cnt *
  291. file->stat_head->ds_cnt;
  292. file->rra_ptr = (struct rrd_rra_ptr *) ptr;
  293. if (completed) {
  294. ptr += sizeof(struct rrd_rra_ptr) * file->stat_head->rra_cnt;
  295. file->rrd_value = (double *) ptr;
  296. }
  297. else {
  298. file->rrd_value = NULL;
  299. }
  300. }
  301. static void
  302. rspamd_rrd_calculate_checksum(struct rspamd_rrd_file *file)
  303. {
  304. unsigned char sigbuf[rspamd_cryptobox_HASHBYTES];
  305. struct rrd_ds_def *ds;
  306. unsigned int i;
  307. rspamd_cryptobox_hash_state_t st;
  308. if (file->finalized) {
  309. rspamd_cryptobox_hash_init(&st, NULL, 0);
  310. rspamd_cryptobox_hash_update(&st, file->filename, strlen(file->filename));
  311. for (i = 0; i < file->stat_head->ds_cnt; i++) {
  312. ds = &file->ds_def[i];
  313. rspamd_cryptobox_hash_update(&st, ds->ds_nam, sizeof(ds->ds_nam));
  314. }
  315. rspamd_cryptobox_hash_final(&st, sigbuf);
  316. file->id = rspamd_encode_base32(sigbuf, sizeof(sigbuf), RSPAMD_BASE32_DEFAULT);
  317. }
  318. }
  319. static int
  320. rspamd_rrd_open_exclusive(const char *filename)
  321. {
  322. struct timespec sleep_ts = {
  323. .tv_sec = 0,
  324. .tv_nsec = 1000000};
  325. int fd;
  326. fd = open(filename, O_RDWR);
  327. if (fd == -1) {
  328. return -1;
  329. }
  330. for (;;) {
  331. if (rspamd_file_lock(fd, TRUE) == -1) {
  332. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  333. nanosleep(&sleep_ts, NULL);
  334. continue;
  335. }
  336. else {
  337. close(fd);
  338. return -1;
  339. }
  340. }
  341. else {
  342. break;
  343. }
  344. }
  345. return fd;
  346. };
  347. /**
  348. * Open completed or incompleted rrd file
  349. * @param filename
  350. * @param completed
  351. * @param err
  352. * @return
  353. */
  354. static struct rspamd_rrd_file *
  355. rspamd_rrd_open_common(const char *filename, gboolean completed, GError **err)
  356. {
  357. struct rspamd_rrd_file *file;
  358. int fd;
  359. struct stat st;
  360. if (!rspamd_rrd_check_file(filename, completed, err)) {
  361. return NULL;
  362. }
  363. file = g_malloc0(sizeof(struct rspamd_rrd_file));
  364. /* Open file */
  365. fd = rspamd_rrd_open_exclusive(filename);
  366. if (fd == -1) {
  367. g_set_error(err,
  368. rrd_error_quark(), errno, "rrd open error: %s", strerror(errno));
  369. g_free(file);
  370. return FALSE;
  371. }
  372. if (fstat(fd, &st) == -1) {
  373. g_set_error(err,
  374. rrd_error_quark(), errno, "rrd stat error: %s", strerror(errno));
  375. rspamd_file_unlock(fd, FALSE);
  376. g_free(file);
  377. close(fd);
  378. return FALSE;
  379. }
  380. /* Mmap file */
  381. file->size = st.st_size;
  382. if ((file->map =
  383. mmap(NULL, st.st_size, PROT_READ | PROT_WRITE,
  384. MAP_SHARED, fd, 0)) == MAP_FAILED) {
  385. rspamd_file_unlock(fd, FALSE);
  386. close(fd);
  387. g_set_error(err,
  388. rrd_error_quark(), ENOMEM, "mmap failed: %s", strerror(errno));
  389. g_free(file);
  390. return NULL;
  391. }
  392. file->fd = fd;
  393. /* Adjust pointers */
  394. rspamd_rrd_adjust_pointers(file, completed);
  395. /* Mark it as finalized */
  396. file->finalized = completed;
  397. file->filename = g_strdup(filename);
  398. rspamd_rrd_calculate_checksum(file);
  399. return file;
  400. }
  401. /**
  402. * Open (and mmap) existing RRD file
  403. * @param filename path
  404. * @param err error pointer
  405. * @return rrd file structure
  406. */
  407. struct rspamd_rrd_file *
  408. rspamd_rrd_open(const char *filename, GError **err)
  409. {
  410. struct rspamd_rrd_file *file;
  411. if ((file = rspamd_rrd_open_common(filename, TRUE, err))) {
  412. msg_info_rrd("rrd file opened: %s", filename);
  413. }
  414. return file;
  415. }
  416. /**
  417. * Create basic header for rrd file
  418. * @param filename file path
  419. * @param ds_count number of data sources
  420. * @param rra_count number of round robin archives
  421. * @param pdp_step step of primary data points
  422. * @param err error pointer
  423. * @return TRUE if file has been created
  424. */
  425. struct rspamd_rrd_file *
  426. rspamd_rrd_create(const char *filename,
  427. gulong ds_count,
  428. gulong rra_count,
  429. gulong pdp_step,
  430. double initial_ticks,
  431. GError **err)
  432. {
  433. struct rspamd_rrd_file *new;
  434. struct rrd_file_head head;
  435. struct rrd_ds_def ds;
  436. struct rrd_rra_def rra;
  437. struct rrd_live_head lh;
  438. struct rrd_pdp_prep pdp;
  439. struct rrd_cdp_prep cdp;
  440. struct rrd_rra_ptr rra_ptr;
  441. int fd;
  442. unsigned int i, j;
  443. /* Open file */
  444. fd = open(filename, O_RDWR | O_CREAT | O_EXCL, 0644);
  445. if (fd == -1) {
  446. g_set_error(err,
  447. rrd_error_quark(), errno, "rrd create error: %s",
  448. strerror(errno));
  449. return NULL;
  450. }
  451. rspamd_file_lock(fd, FALSE);
  452. /* Fill header */
  453. memset(&head, 0, sizeof(head));
  454. head.rra_cnt = rra_count;
  455. head.ds_cnt = ds_count;
  456. head.pdp_step = pdp_step;
  457. memcpy(head.cookie, RRD_COOKIE, sizeof(head.cookie));
  458. memcpy(head.version, RRD_VERSION, sizeof(head.version));
  459. head.float_cookie = RRD_FLOAT_COOKIE;
  460. if (write(fd, &head, sizeof(head)) != sizeof(head)) {
  461. rspamd_file_unlock(fd, FALSE);
  462. close(fd);
  463. g_set_error(err,
  464. rrd_error_quark(), errno, "rrd write error: %s", strerror(errno));
  465. return NULL;
  466. }
  467. /* Fill DS section */
  468. memset(&ds, 0, sizeof(ds));
  469. memset(&ds.ds_nam, 0, sizeof(ds.ds_nam));
  470. memcpy(&ds.dst, "COUNTER", sizeof("COUNTER"));
  471. memset(&ds.par, 0, sizeof(ds.par));
  472. for (i = 0; i < ds_count; i++) {
  473. if (write(fd, &ds, sizeof(ds)) != sizeof(ds)) {
  474. rspamd_file_unlock(fd, FALSE);
  475. close(fd);
  476. g_set_error(err,
  477. rrd_error_quark(), errno, "rrd write error: %s",
  478. strerror(errno));
  479. return NULL;
  480. }
  481. }
  482. /* Fill RRA section */
  483. memset(&rra, 0, sizeof(rra));
  484. memcpy(&rra.cf_nam, "AVERAGE", sizeof("AVERAGE"));
  485. rra.pdp_cnt = 1;
  486. memset(&rra.par, 0, sizeof(rra.par));
  487. for (i = 0; i < rra_count; i++) {
  488. if (write(fd, &rra, sizeof(rra)) != sizeof(rra)) {
  489. rspamd_file_unlock(fd, FALSE);
  490. close(fd);
  491. g_set_error(err,
  492. rrd_error_quark(), errno, "rrd write error: %s",
  493. strerror(errno));
  494. return NULL;
  495. }
  496. }
  497. /* Fill live header */
  498. memset(&lh, 0, sizeof(lh));
  499. lh.last_up = (glong) initial_ticks;
  500. lh.last_up_usec = (glong) ((initial_ticks - lh.last_up) * 1e6f);
  501. if (write(fd, &lh, sizeof(lh)) != sizeof(lh)) {
  502. rspamd_file_unlock(fd, FALSE);
  503. close(fd);
  504. g_set_error(err,
  505. rrd_error_quark(), errno, "rrd write error: %s", strerror(errno));
  506. return NULL;
  507. }
  508. /* Fill pdp prep */
  509. memset(&pdp, 0, sizeof(pdp));
  510. memcpy(&pdp.last_ds, "U", sizeof("U"));
  511. memset(&pdp.scratch, 0, sizeof(pdp.scratch));
  512. pdp.scratch[PDP_val].dv = NAN;
  513. pdp.scratch[PDP_unkn_sec_cnt].lv = 0;
  514. for (i = 0; i < ds_count; i++) {
  515. if (write(fd, &pdp, sizeof(pdp)) != sizeof(pdp)) {
  516. rspamd_file_unlock(fd, FALSE);
  517. close(fd);
  518. g_set_error(err,
  519. rrd_error_quark(), errno, "rrd write error: %s",
  520. strerror(errno));
  521. return NULL;
  522. }
  523. }
  524. /* Fill cdp prep */
  525. memset(&cdp, 0, sizeof(cdp));
  526. memset(&cdp.scratch, 0, sizeof(cdp.scratch));
  527. cdp.scratch[CDP_val].dv = NAN;
  528. cdp.scratch[CDP_unkn_pdp_cnt].lv = 0;
  529. for (i = 0; i < rra_count; i++) {
  530. for (j = 0; j < ds_count; j++) {
  531. if (write(fd, &cdp, sizeof(cdp)) != sizeof(cdp)) {
  532. rspamd_file_unlock(fd, FALSE);
  533. close(fd);
  534. g_set_error(err,
  535. rrd_error_quark(), errno, "rrd write error: %s",
  536. strerror(errno));
  537. return NULL;
  538. }
  539. }
  540. }
  541. /* Set row pointers */
  542. memset(&rra_ptr, 0, sizeof(rra_ptr));
  543. for (i = 0; i < rra_count; i++) {
  544. if (write(fd, &rra_ptr, sizeof(rra_ptr)) != sizeof(rra_ptr)) {
  545. rspamd_file_unlock(fd, FALSE);
  546. close(fd);
  547. g_set_error(err,
  548. rrd_error_quark(), errno, "rrd write error: %s",
  549. strerror(errno));
  550. return NULL;
  551. }
  552. }
  553. rspamd_file_unlock(fd, FALSE);
  554. close(fd);
  555. new = rspamd_rrd_open_common(filename, FALSE, err);
  556. return new;
  557. }
  558. /**
  559. * Add data sources to rrd file
  560. * @param filename path to file
  561. * @param ds array of struct rrd_ds_def
  562. * @param err error pointer
  563. * @return TRUE if data sources were added
  564. */
  565. gboolean
  566. rspamd_rrd_add_ds(struct rspamd_rrd_file *file, GArray *ds, GError **err)
  567. {
  568. if (file == NULL || file->stat_head->ds_cnt * sizeof(struct rrd_ds_def) !=
  569. ds->len) {
  570. g_set_error(err,
  571. rrd_error_quark(), EINVAL, "rrd add ds failed: wrong arguments");
  572. return FALSE;
  573. }
  574. /* Straightforward memcpy */
  575. memcpy(file->ds_def, ds->data, ds->len);
  576. return TRUE;
  577. }
  578. /**
  579. * Add round robin archives to rrd file
  580. * @param filename path to file
  581. * @param ds array of struct rrd_rra_def
  582. * @param err error pointer
  583. * @return TRUE if archives were added
  584. */
  585. gboolean
  586. rspamd_rrd_add_rra(struct rspamd_rrd_file *file, GArray *rra, GError **err)
  587. {
  588. if (file == NULL || file->stat_head->rra_cnt *
  589. sizeof(struct rrd_rra_def) !=
  590. rra->len) {
  591. g_set_error(err,
  592. rrd_error_quark(), EINVAL, "rrd add rra failed: wrong arguments");
  593. return FALSE;
  594. }
  595. /* Straightforward memcpy */
  596. memcpy(file->rra_def, rra->data, rra->len);
  597. return TRUE;
  598. }
  599. /**
  600. * Finalize rrd file header and initialize all RRA in the file
  601. * @param filename file path
  602. * @param err error pointer
  603. * @return TRUE if rrd file is ready for use
  604. */
  605. gboolean
  606. rspamd_rrd_finalize(struct rspamd_rrd_file *file, GError **err)
  607. {
  608. int fd;
  609. unsigned int i;
  610. int count = 0;
  611. double vbuf[1024];
  612. struct stat st;
  613. if (file == NULL || file->filename == NULL || file->fd == -1) {
  614. g_set_error(err,
  615. rrd_error_quark(), EINVAL, "rrd add rra failed: wrong arguments");
  616. return FALSE;
  617. }
  618. fd = file->fd;
  619. if (lseek(fd, 0, SEEK_END) == -1) {
  620. g_set_error(err,
  621. rrd_error_quark(), errno, "rrd seek error: %s", strerror(errno));
  622. close(fd);
  623. return FALSE;
  624. }
  625. /* Adjust CDP */
  626. for (i = 0; i < file->stat_head->rra_cnt; i++) {
  627. file->cdp_prep->scratch[CDP_unkn_pdp_cnt].lv = 0;
  628. /* Randomize row pointer (disabled) */
  629. /* file->rra_ptr->cur_row = g_random_int () % file->rra_def[i].row_cnt; */
  630. file->rra_ptr->cur_row = file->rra_def[i].row_cnt - 1;
  631. /* Calculate values count */
  632. count += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
  633. }
  634. munmap(file->map, file->size);
  635. /* Write values */
  636. for (i = 0; i < G_N_ELEMENTS(vbuf); i++) {
  637. vbuf[i] = NAN;
  638. }
  639. while (count > 0) {
  640. /* Write values in buffered matter */
  641. if (write(fd, vbuf,
  642. MIN((int) G_N_ELEMENTS(vbuf), count) * sizeof(double)) == -1) {
  643. g_set_error(err,
  644. rrd_error_quark(), errno, "rrd write error: %s",
  645. strerror(errno));
  646. close(fd);
  647. return FALSE;
  648. }
  649. count -= G_N_ELEMENTS(vbuf);
  650. }
  651. if (fstat(fd, &st) == -1) {
  652. g_set_error(err,
  653. rrd_error_quark(), errno, "rrd stat error: %s", strerror(errno));
  654. close(fd);
  655. return FALSE;
  656. }
  657. /* Mmap again */
  658. file->size = st.st_size;
  659. if ((file->map =
  660. mmap(NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd,
  661. 0)) == MAP_FAILED) {
  662. close(fd);
  663. g_set_error(err,
  664. rrd_error_quark(), ENOMEM, "mmap failed: %s", strerror(errno));
  665. return FALSE;
  666. }
  667. /* Adjust pointers */
  668. rspamd_rrd_adjust_pointers(file, TRUE);
  669. file->finalized = TRUE;
  670. rspamd_rrd_calculate_checksum(file);
  671. msg_info_rrd("rrd file created: %s", file->filename);
  672. return TRUE;
  673. }
  674. /**
  675. * Update pdp_prep data
  676. * @param file rrd file
  677. * @param vals new values
  678. * @param pdp_new new pdp array
  679. * @param interval time elapsed from the last update
  680. * @return
  681. */
  682. static gboolean
  683. rspamd_rrd_update_pdp_prep(struct rspamd_rrd_file *file,
  684. double *vals,
  685. double *pdp_new,
  686. double interval)
  687. {
  688. unsigned int i;
  689. enum rrd_dst_type type;
  690. for (i = 0; i < file->stat_head->ds_cnt; i++) {
  691. type = rrd_dst_from_string(file->ds_def[i].dst);
  692. if (file->ds_def[i].par[RRD_DS_mrhb_cnt].lv < interval) {
  693. rspamd_strlcpy(file->pdp_prep[i].last_ds, "U",
  694. sizeof(file->pdp_prep[i].last_ds));
  695. pdp_new[i] = NAN;
  696. msg_debug_rrd("adding unknown point interval %.3f is less than heartbeat %l",
  697. interval, file->ds_def[i].par[RRD_DS_mrhb_cnt].lv);
  698. }
  699. else {
  700. switch (type) {
  701. case RRD_DST_COUNTER:
  702. case RRD_DST_DERIVE:
  703. if (file->pdp_prep[i].last_ds[0] == 'U') {
  704. pdp_new[i] = NAN;
  705. msg_debug_rrd("last point is NaN for point %ud", i);
  706. }
  707. else {
  708. pdp_new[i] = vals[i] - strtod(file->pdp_prep[i].last_ds,
  709. NULL);
  710. msg_debug_rrd("new PDP %ud, %.3f", i, pdp_new[i]);
  711. }
  712. break;
  713. case RRD_DST_GAUGE:
  714. pdp_new[i] = vals[i] * interval;
  715. msg_debug_rrd("new PDP %ud, %.3f", i, pdp_new[i]);
  716. break;
  717. case RRD_DST_ABSOLUTE:
  718. pdp_new[i] = vals[i];
  719. msg_debug_rrd("new PDP %ud, %.3f", i, pdp_new[i]);
  720. break;
  721. default:
  722. return FALSE;
  723. }
  724. }
  725. /* Copy value to the last_ds */
  726. if (!isnan(vals[i])) {
  727. rspamd_snprintf(file->pdp_prep[i].last_ds,
  728. sizeof(file->pdp_prep[i].last_ds), "%.4f", vals[i]);
  729. }
  730. else {
  731. file->pdp_prep[i].last_ds[0] = 'U';
  732. file->pdp_prep[i].last_ds[1] = '\0';
  733. }
  734. }
  735. return TRUE;
  736. }
  737. /**
  738. * Update step for this pdp
  739. * @param file
  740. * @param pdp_new new pdp array
  741. * @param pdp_temp temp pdp array
  742. * @param interval time till last update
  743. * @param pre_int pre interval
  744. * @param post_int post intervall
  745. * @param pdp_diff time till last pdp update
  746. */
  747. static void
  748. rspamd_rrd_update_pdp_step(struct rspamd_rrd_file *file,
  749. double *pdp_new,
  750. double *pdp_temp,
  751. double interval,
  752. gulong pdp_diff)
  753. {
  754. unsigned int i;
  755. rrd_value_t *scratch;
  756. gulong heartbeat;
  757. for (i = 0; i < file->stat_head->ds_cnt; i++) {
  758. scratch = file->pdp_prep[i].scratch;
  759. heartbeat = file->ds_def[i].par[RRD_DS_mrhb_cnt].lv;
  760. if (!isnan(pdp_new[i])) {
  761. if (isnan(scratch[PDP_val].dv)) {
  762. scratch[PDP_val].dv = 0;
  763. }
  764. }
  765. /* Check interval value for heartbeat for this DS */
  766. if ((interval > heartbeat) ||
  767. (file->stat_head->pdp_step / 2.0 < scratch[PDP_unkn_sec_cnt].lv)) {
  768. pdp_temp[i] = NAN;
  769. }
  770. else {
  771. pdp_temp[i] = scratch[PDP_val].dv /
  772. ((double) (pdp_diff - scratch[PDP_unkn_sec_cnt].lv));
  773. }
  774. if (isnan(pdp_new[i])) {
  775. scratch[PDP_unkn_sec_cnt].lv = interval;
  776. scratch[PDP_val].dv = NAN;
  777. }
  778. else {
  779. scratch[PDP_unkn_sec_cnt].lv = 0;
  780. scratch[PDP_val].dv = pdp_new[i] / interval;
  781. }
  782. msg_debug_rrd("new temp PDP %ud, %.3f -> %.3f, scratch: %3f",
  783. i, pdp_new[i], pdp_temp[i],
  784. scratch[PDP_val].dv);
  785. }
  786. }
  787. /**
  788. * Update CDP for this rra
  789. * @param file rrd file
  790. * @param pdp_steps how much pdp steps elapsed from the last update
  791. * @param pdp_offset offset from pdp
  792. * @param rra_steps how much steps must be updated for this rra
  793. * @param rra_index index of desired rra
  794. * @param pdp_temp temporary pdp points
  795. */
  796. static void
  797. rspamd_rrd_update_cdp(struct rspamd_rrd_file *file,
  798. double pdp_steps,
  799. double pdp_offset,
  800. gulong *rra_steps,
  801. gulong rra_index,
  802. double *pdp_temp)
  803. {
  804. unsigned int i;
  805. struct rrd_rra_def *rra;
  806. rrd_value_t *scratch;
  807. enum rrd_cf_type cf;
  808. double last_cdp = INFINITY, cur_cdp = INFINITY;
  809. gulong pdp_in_cdp;
  810. rra = &file->rra_def[rra_index];
  811. cf = rrd_cf_from_string(rra->cf_nam);
  812. /* Iterate over all DS for this RRA */
  813. for (i = 0; i < file->stat_head->ds_cnt; i++) {
  814. /* Get CDP for this RRA and DS */
  815. scratch =
  816. file->cdp_prep[rra_index * file->stat_head->ds_cnt + i].scratch;
  817. if (rra->pdp_cnt > 1) {
  818. /* Do we have any CDP to update for this rra ? */
  819. if (rra_steps[rra_index] > 0) {
  820. if (isnan(pdp_temp[i])) {
  821. /* New pdp is nan */
  822. /* Increment unknown points count */
  823. scratch[CDP_unkn_pdp_cnt].lv += pdp_offset;
  824. /* Reset secondary value */
  825. scratch[CDP_secondary_val].dv = NAN;
  826. }
  827. else {
  828. scratch[CDP_secondary_val].dv = pdp_temp[i];
  829. }
  830. /* Check XFF for this rra */
  831. if (scratch[CDP_unkn_pdp_cnt].lv > rra->pdp_cnt *
  832. rra->par[RRA_cdp_xff_val].lv) {
  833. /* XFF is reached */
  834. scratch[CDP_primary_val].dv = NAN;
  835. }
  836. else {
  837. /* Need to initialize CDP using specified consolidation */
  838. switch (cf) {
  839. case RRD_CF_AVERAGE:
  840. last_cdp =
  841. isnan(scratch[CDP_val].dv) ? 0.0 : scratch[CDP_val].dv;
  842. cur_cdp = isnan(pdp_temp[i]) ? 0.0 : pdp_temp[i];
  843. scratch[CDP_primary_val].dv =
  844. (last_cdp + cur_cdp *
  845. pdp_offset) /
  846. (rra->pdp_cnt - scratch[CDP_unkn_pdp_cnt].lv);
  847. break;
  848. case RRD_CF_MAXIMUM:
  849. last_cdp =
  850. isnan(scratch[CDP_val].dv) ? -INFINITY : scratch[CDP_val].dv;
  851. cur_cdp = isnan(pdp_temp[i]) ? -INFINITY : pdp_temp[i];
  852. scratch[CDP_primary_val].dv = MAX(last_cdp, cur_cdp);
  853. break;
  854. case RRD_CF_MINIMUM:
  855. last_cdp =
  856. isnan(scratch[CDP_val].dv) ? INFINITY : scratch[CDP_val].dv;
  857. cur_cdp = isnan(pdp_temp[i]) ? INFINITY : pdp_temp[i];
  858. scratch[CDP_primary_val].dv = MIN(last_cdp, cur_cdp);
  859. break;
  860. case RRD_CF_LAST:
  861. default:
  862. scratch[CDP_primary_val].dv = pdp_temp[i];
  863. last_cdp = INFINITY;
  864. break;
  865. }
  866. }
  867. /* Init carry of this CDP */
  868. pdp_in_cdp = (pdp_steps - pdp_offset) / rra->pdp_cnt;
  869. if (pdp_in_cdp == 0 || isnan(pdp_temp[i])) {
  870. /* Set overflow */
  871. switch (cf) {
  872. case RRD_CF_AVERAGE:
  873. scratch[CDP_val].dv = 0;
  874. break;
  875. case RRD_CF_MAXIMUM:
  876. scratch[CDP_val].dv = -INFINITY;
  877. break;
  878. case RRD_CF_MINIMUM:
  879. scratch[CDP_val].dv = INFINITY;
  880. break;
  881. default:
  882. scratch[CDP_val].dv = NAN;
  883. break;
  884. }
  885. }
  886. else {
  887. /* Special carry for average */
  888. if (cf == RRD_CF_AVERAGE) {
  889. scratch[CDP_val].dv = pdp_temp[i] * pdp_in_cdp;
  890. }
  891. else {
  892. scratch[CDP_val].dv = pdp_temp[i];
  893. }
  894. }
  895. scratch[CDP_unkn_pdp_cnt].lv = 0;
  896. msg_debug_rrd("update cdp for DS %d with value %.3f, "
  897. "stored value: %.3f, carry: %.3f",
  898. i, last_cdp,
  899. scratch[CDP_primary_val].dv, scratch[CDP_val].dv);
  900. }
  901. /* In this case we just need to update cdp_prep for this RRA */
  902. else {
  903. if (isnan(pdp_temp[i])) {
  904. /* Just increase undefined zone */
  905. scratch[CDP_unkn_pdp_cnt].lv += pdp_steps;
  906. }
  907. else {
  908. /* Calculate cdp value */
  909. last_cdp = scratch[CDP_val].dv;
  910. switch (cf) {
  911. case RRD_CF_AVERAGE:
  912. if (isnan(last_cdp)) {
  913. scratch[CDP_val].dv = pdp_temp[i] * pdp_steps;
  914. }
  915. else {
  916. scratch[CDP_val].dv = last_cdp + pdp_temp[i] *
  917. pdp_steps;
  918. }
  919. break;
  920. case RRD_CF_MAXIMUM:
  921. scratch[CDP_val].dv = MAX(last_cdp, pdp_temp[i]);
  922. break;
  923. case RRD_CF_MINIMUM:
  924. scratch[CDP_val].dv = MIN(last_cdp, pdp_temp[i]);
  925. break;
  926. case RRD_CF_LAST:
  927. scratch[CDP_val].dv = pdp_temp[i];
  928. break;
  929. default:
  930. scratch[CDP_val].dv = NAN;
  931. break;
  932. }
  933. }
  934. msg_debug_rrd("aggregate cdp %d with pdp %.3f, "
  935. "stored value: %.3f",
  936. i, pdp_temp[i], scratch[CDP_val].dv);
  937. }
  938. }
  939. else {
  940. /* We have nothing to consolidate, but we may miss some pdp */
  941. if (pdp_steps > 2) {
  942. /* Just write PDP value */
  943. scratch[CDP_primary_val].dv = pdp_temp[i];
  944. scratch[CDP_secondary_val].dv = pdp_temp[i];
  945. }
  946. }
  947. }
  948. }
  949. /**
  950. * Update RRA in a file
  951. * @param file rrd file
  952. * @param rra_steps steps for each rra
  953. * @param now current time
  954. */
  955. void rspamd_rrd_write_rra(struct rspamd_rrd_file *file, gulong *rra_steps)
  956. {
  957. unsigned int i, j, ds_cnt;
  958. struct rrd_rra_def *rra;
  959. struct rrd_cdp_prep *cdp;
  960. double *rra_row = file->rrd_value, *cur_row;
  961. ds_cnt = file->stat_head->ds_cnt;
  962. /* Iterate over all RRA */
  963. for (i = 0; i < file->stat_head->rra_cnt; i++) {
  964. rra = &file->rra_def[i];
  965. if (rra_steps[i] > 0) {
  966. /* Move row ptr */
  967. if (++file->rra_ptr[i].cur_row >= rra->row_cnt) {
  968. file->rra_ptr[i].cur_row = 0;
  969. }
  970. /* Calculate seek */
  971. cdp = &file->cdp_prep[ds_cnt * i];
  972. cur_row = rra_row + ds_cnt * file->rra_ptr[i].cur_row;
  973. /* Iterate over DS */
  974. for (j = 0; j < ds_cnt; j++) {
  975. cur_row[j] = cdp[j].scratch[CDP_primary_val].dv;
  976. msg_debug_rrd("write cdp %d: %.3f", j, cur_row[j]);
  977. }
  978. }
  979. rra_row += rra->row_cnt * ds_cnt;
  980. }
  981. }
  982. /**
  983. * Add record to rrd file
  984. * @param file rrd file object
  985. * @param points points (must be row suitable for this RRA, depending on ds count)
  986. * @param err error pointer
  987. * @return TRUE if a row has been added
  988. */
  989. gboolean
  990. rspamd_rrd_add_record(struct rspamd_rrd_file *file,
  991. GArray *points,
  992. double ticks,
  993. GError **err)
  994. {
  995. double interval, *pdp_new, *pdp_temp;
  996. unsigned int i;
  997. glong seconds, microseconds;
  998. gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step,
  999. prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset;
  1000. if (file == NULL || file->stat_head->ds_cnt * sizeof(double) !=
  1001. points->len) {
  1002. g_set_error(err,
  1003. rrd_error_quark(), EINVAL,
  1004. "rrd add points failed: wrong arguments");
  1005. return FALSE;
  1006. }
  1007. /* Get interval */
  1008. seconds = (glong) ticks;
  1009. microseconds = (glong) ((ticks - seconds) * 1000000.);
  1010. interval = ticks - ((double) file->live_head->last_up +
  1011. file->live_head->last_up_usec / 1000000.);
  1012. msg_debug_rrd("update rrd record after %.3f seconds", interval);
  1013. /* Update PDP preparation values */
  1014. pdp_new = g_malloc0(sizeof(double) * file->stat_head->ds_cnt);
  1015. pdp_temp = g_malloc0(sizeof(double) * file->stat_head->ds_cnt);
  1016. /* How much steps need to be updated in each RRA */
  1017. rra_steps = g_malloc0(sizeof(gulong) * file->stat_head->rra_cnt);
  1018. if (!rspamd_rrd_update_pdp_prep(file, (double *) points->data, pdp_new,
  1019. interval)) {
  1020. g_set_error(err,
  1021. rrd_error_quark(), EINVAL,
  1022. "rrd update pdp failed: wrong arguments");
  1023. g_free(pdp_new);
  1024. g_free(pdp_temp);
  1025. g_free(rra_steps);
  1026. return FALSE;
  1027. }
  1028. /* Calculate elapsed steps */
  1029. /* Age in seconds for previous pdp store */
  1030. prev_pdp_age = file->live_head->last_up % file->stat_head->pdp_step;
  1031. /* Time in seconds for last pdp update */
  1032. prev_pdp_step = file->live_head->last_up - prev_pdp_age;
  1033. /* Age in seconds from current time to required pdp time */
  1034. cur_pdp_age = seconds % file->stat_head->pdp_step;
  1035. /* Time of desired pdp step */
  1036. cur_pdp_step = seconds - cur_pdp_age;
  1037. cur_pdp_count = cur_pdp_step / file->stat_head->pdp_step;
  1038. pdp_steps = (cur_pdp_step - prev_pdp_step) / file->stat_head->pdp_step;
  1039. if (pdp_steps == 0) {
  1040. /* Simple update of pdp prep */
  1041. for (i = 0; i < file->stat_head->ds_cnt; i++) {
  1042. if (isnan(pdp_new[i])) {
  1043. /* Increment unknown period */
  1044. file->pdp_prep[i].scratch[PDP_unkn_sec_cnt].lv += floor(
  1045. interval);
  1046. }
  1047. else {
  1048. if (isnan(file->pdp_prep[i].scratch[PDP_val].dv)) {
  1049. /* Reset pdp to the current value */
  1050. file->pdp_prep[i].scratch[PDP_val].dv = pdp_new[i];
  1051. }
  1052. else {
  1053. /* Increment pdp value */
  1054. file->pdp_prep[i].scratch[PDP_val].dv += pdp_new[i];
  1055. }
  1056. }
  1057. }
  1058. }
  1059. else {
  1060. /* Complex update of PDP, CDP and RRA */
  1061. /* Update PDP for this step */
  1062. rspamd_rrd_update_pdp_step(file,
  1063. pdp_new,
  1064. pdp_temp,
  1065. interval,
  1066. pdp_steps * file->stat_head->pdp_step);
  1067. /* Update CDP points for each RRA*/
  1068. for (i = 0; i < file->stat_head->rra_cnt; i++) {
  1069. /* Calculate pdp offset for this RRA */
  1070. pdp_offset = file->rra_def[i].pdp_cnt - cur_pdp_count %
  1071. file->rra_def[i].pdp_cnt;
  1072. /* How much steps we got for this RRA */
  1073. if (pdp_offset <= pdp_steps) {
  1074. rra_steps[i] =
  1075. (pdp_steps - pdp_offset) / file->rra_def[i].pdp_cnt + 1;
  1076. }
  1077. else {
  1078. /* This rra have not passed enough pdp steps */
  1079. rra_steps[i] = 0;
  1080. }
  1081. msg_debug_rrd("cdp: %ud, rra steps: %ul(%ul), pdp steps: %ul",
  1082. i, rra_steps[i], pdp_offset, pdp_steps);
  1083. /* Update this specific CDP */
  1084. rspamd_rrd_update_cdp(file,
  1085. pdp_steps,
  1086. pdp_offset,
  1087. rra_steps,
  1088. i,
  1089. pdp_temp);
  1090. }
  1091. /* Write RRA */
  1092. rspamd_rrd_write_rra(file, rra_steps);
  1093. }
  1094. file->live_head->last_up = seconds;
  1095. file->live_head->last_up_usec = microseconds;
  1096. /* Sync and invalidate */
  1097. msync(file->map, file->size, MS_ASYNC | MS_INVALIDATE);
  1098. g_free(pdp_new);
  1099. g_free(pdp_temp);
  1100. g_free(rra_steps);
  1101. return TRUE;
  1102. }
  1103. /**
  1104. * Close rrd file
  1105. * @param file
  1106. * @return
  1107. */
  1108. int rspamd_rrd_close(struct rspamd_rrd_file *file)
  1109. {
  1110. if (file == NULL) {
  1111. errno = EINVAL;
  1112. return -1;
  1113. }
  1114. munmap(file->map, file->size);
  1115. close(file->fd);
  1116. g_free(file->filename);
  1117. g_free(file->id);
  1118. g_free(file);
  1119. return 0;
  1120. }
  1121. static struct rspamd_rrd_file *
  1122. rspamd_rrd_create_file(const char *path, gboolean finalize, GError **err)
  1123. {
  1124. struct rspamd_rrd_file *file;
  1125. struct rrd_ds_def ds[RSPAMD_RRD_DS_COUNT];
  1126. struct rrd_rra_def rra[RSPAMD_RRD_RRA_COUNT];
  1127. int i;
  1128. GArray ar;
  1129. /* Try to create new rrd file */
  1130. file = rspamd_rrd_create(path, RSPAMD_RRD_DS_COUNT, RSPAMD_RRD_RRA_COUNT,
  1131. 1, rspamd_get_calendar_ticks(), err);
  1132. if (file == NULL) {
  1133. return NULL;
  1134. }
  1135. /* Create DS and RRA */
  1136. for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i++) {
  1137. rrd_make_default_ds(rspamd_action_to_str(i),
  1138. rrd_dst_to_string(RRD_DST_COUNTER), 1, &ds[i]);
  1139. }
  1140. ar.data = (char *) ds;
  1141. ar.len = sizeof(ds);
  1142. if (!rspamd_rrd_add_ds(file, &ar, err)) {
  1143. rspamd_rrd_close(file);
  1144. return NULL;
  1145. }
  1146. /* Once per minute for 1 day */
  1147. rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE),
  1148. 60, 24 * 60, &rra[0]);
  1149. /* Once per 5 minutes for 1 week */
  1150. rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE),
  1151. 5 * 60, 7 * 24 * 60 / 5, &rra[1]);
  1152. /* Once per 10 mins for 1 month */
  1153. rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE),
  1154. 60 * 10, 30 * 24 * 6, &rra[2]);
  1155. /* Once per hour for 1 year */
  1156. rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE),
  1157. 60 * 60, 365 * 24, &rra[3]);
  1158. ar.data = (char *) rra;
  1159. ar.len = sizeof(rra);
  1160. if (!rspamd_rrd_add_rra(file, &ar, err)) {
  1161. rspamd_rrd_close(file);
  1162. return NULL;
  1163. }
  1164. if (finalize && !rspamd_rrd_finalize(file, err)) {
  1165. rspamd_rrd_close(file);
  1166. return NULL;
  1167. }
  1168. return file;
  1169. }
  1170. static void
  1171. rspamd_rrd_convert_ds(struct rspamd_rrd_file *old,
  1172. struct rspamd_rrd_file *cur, int idx_old, int idx_new)
  1173. {
  1174. struct rrd_pdp_prep *pdp_prep_old, *pdp_prep_new;
  1175. struct rrd_cdp_prep *cdp_prep_old, *cdp_prep_new;
  1176. double *val_old, *val_new;
  1177. gulong rra_cnt, i, j, points_cnt, old_ds, new_ds;
  1178. rra_cnt = old->stat_head->rra_cnt;
  1179. pdp_prep_old = &old->pdp_prep[idx_old];
  1180. pdp_prep_new = &cur->pdp_prep[idx_new];
  1181. memcpy(pdp_prep_new, pdp_prep_old, sizeof(*pdp_prep_new));
  1182. val_old = old->rrd_value;
  1183. val_new = cur->rrd_value;
  1184. old_ds = old->stat_head->ds_cnt;
  1185. new_ds = cur->stat_head->ds_cnt;
  1186. for (i = 0; i < rra_cnt; i++) {
  1187. cdp_prep_old = &old->cdp_prep[i * old_ds] + idx_old;
  1188. cdp_prep_new = &cur->cdp_prep[i * new_ds] + idx_new;
  1189. memcpy(cdp_prep_new, cdp_prep_old, sizeof(*cdp_prep_new));
  1190. points_cnt = old->rra_def[i].row_cnt;
  1191. for (j = 0; j < points_cnt; j++) {
  1192. val_new[j * new_ds + idx_new] = val_old[j * old_ds + idx_old];
  1193. }
  1194. val_new += points_cnt * new_ds;
  1195. val_old += points_cnt * old_ds;
  1196. }
  1197. }
  1198. static struct rspamd_rrd_file *
  1199. rspamd_rrd_convert(const char *path, struct rspamd_rrd_file *old,
  1200. GError **err)
  1201. {
  1202. struct rspamd_rrd_file *rrd;
  1203. char tpath[PATH_MAX];
  1204. g_assert(old != NULL);
  1205. rspamd_snprintf(tpath, sizeof(tpath), "%s.new", path);
  1206. rrd = rspamd_rrd_create_file(tpath, TRUE, err);
  1207. if (rrd) {
  1208. /* Copy old data */
  1209. memcpy(rrd->live_head, old->live_head, sizeof(*rrd->live_head));
  1210. memcpy(rrd->rra_ptr, old->rra_ptr,
  1211. sizeof(*old->rra_ptr) * rrd->stat_head->rra_cnt);
  1212. /*
  1213. * Old DSes:
  1214. * 0 - spam -> reject
  1215. * 1 - probable spam -> add header
  1216. * 2 - greylist -> greylist
  1217. * 3 - ham -> ham
  1218. */
  1219. rspamd_rrd_convert_ds(old, rrd, 0, METRIC_ACTION_REJECT);
  1220. rspamd_rrd_convert_ds(old, rrd, 1, METRIC_ACTION_ADD_HEADER);
  1221. rspamd_rrd_convert_ds(old, rrd, 2, METRIC_ACTION_GREYLIST);
  1222. rspamd_rrd_convert_ds(old, rrd, 3, METRIC_ACTION_NOACTION);
  1223. if (unlink(path) == -1) {
  1224. g_set_error(err, rrd_error_quark(), errno, "cannot unlink old rrd file %s: %s",
  1225. path, strerror(errno));
  1226. unlink(tpath);
  1227. rspamd_rrd_close(rrd);
  1228. return NULL;
  1229. }
  1230. if (rename(tpath, path) == -1) {
  1231. g_set_error(err, rrd_error_quark(), errno, "cannot rename old rrd file %s: %s",
  1232. path, strerror(errno));
  1233. unlink(tpath);
  1234. rspamd_rrd_close(rrd);
  1235. return NULL;
  1236. }
  1237. }
  1238. return rrd;
  1239. }
  1240. struct rspamd_rrd_file *
  1241. rspamd_rrd_file_default(const char *path,
  1242. GError **err)
  1243. {
  1244. struct rspamd_rrd_file *file, *nf;
  1245. g_assert(path != NULL);
  1246. if (access(path, R_OK) != -1) {
  1247. /* We can open rrd file */
  1248. file = rspamd_rrd_open(path, err);
  1249. if (file == NULL) {
  1250. return NULL;
  1251. }
  1252. if (file->stat_head->rra_cnt != RSPAMD_RRD_RRA_COUNT) {
  1253. msg_err_rrd("rrd file is not suitable for rspamd: it has "
  1254. "%ul ds and %ul rra",
  1255. file->stat_head->ds_cnt,
  1256. file->stat_head->rra_cnt);
  1257. g_set_error(err, rrd_error_quark(), EINVAL, "bad rrd file");
  1258. rspamd_rrd_close(file);
  1259. return NULL;
  1260. }
  1261. else if (file->stat_head->ds_cnt == RSPAMD_RRD_OLD_DS_COUNT) {
  1262. /* Old rrd, need to convert */
  1263. msg_info_rrd("rrd file %s is not suitable for rspamd, convert it",
  1264. path);
  1265. nf = rspamd_rrd_convert(path, file, err);
  1266. rspamd_rrd_close(file);
  1267. return nf;
  1268. }
  1269. else if (file->stat_head->ds_cnt == RSPAMD_RRD_DS_COUNT) {
  1270. return file;
  1271. }
  1272. else {
  1273. msg_err_rrd("rrd file is not suitable for rspamd: it has "
  1274. "%ul ds and %ul rra",
  1275. file->stat_head->ds_cnt,
  1276. file->stat_head->rra_cnt);
  1277. g_set_error(err, rrd_error_quark(), EINVAL, "bad rrd file");
  1278. rspamd_rrd_close(file);
  1279. return NULL;
  1280. }
  1281. }
  1282. file = rspamd_rrd_create_file(path, TRUE, err);
  1283. return file;
  1284. }
  1285. struct rspamd_rrd_query_result *
  1286. rspamd_rrd_query(struct rspamd_rrd_file *file,
  1287. gulong rra_num)
  1288. {
  1289. struct rspamd_rrd_query_result *res;
  1290. struct rrd_rra_def *rra;
  1291. const double *rra_offset = NULL;
  1292. unsigned int i;
  1293. g_assert(file != NULL);
  1294. if (rra_num > file->stat_head->rra_cnt) {
  1295. msg_err_rrd("requested unexisting rra: %l", rra_num);
  1296. return NULL;
  1297. }
  1298. res = g_malloc0(sizeof(*res));
  1299. res->ds_count = file->stat_head->ds_cnt;
  1300. res->last_update = (double) file->live_head->last_up +
  1301. ((double) file->live_head->last_up_usec / 1e6f);
  1302. res->pdp_per_cdp = file->rra_def[rra_num].pdp_cnt;
  1303. res->rra_rows = file->rra_def[rra_num].row_cnt;
  1304. rra_offset = file->rrd_value;
  1305. for (i = 0; i < file->stat_head->rra_cnt; i++) {
  1306. rra = &file->rra_def[i];
  1307. if (i == rra_num) {
  1308. res->cur_row = file->rra_ptr[i].cur_row % rra->row_cnt;
  1309. break;
  1310. }
  1311. rra_offset += rra->row_cnt * res->ds_count;
  1312. }
  1313. res->data = rra_offset;
  1314. return res;
  1315. }