You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

rrd.c 37KB


  1. /*
  2. * Copyright 2024 Vsevolod Stakhov
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "config.h"
  17. #include "rrd.h"
  18. #include "util.h"
  19. #include "cfg_file.h"
  20. #include "logger.h"
  21. #include "unix-std.h"
  22. #include "cryptobox.h"
  23. #include <math.h>
  24. #define RSPAMD_RRD_DS_COUNT METRIC_ACTION_MAX
  25. #define RSPAMD_RRD_OLD_DS_COUNT 4
  26. #define RSPAMD_RRD_RRA_COUNT 4
  27. #define msg_err_rrd(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
  28. "rrd", file->id, \
  29. G_STRFUNC, \
  30. __VA_ARGS__)
  31. #define msg_warn_rrd(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \
  32. "rrd", file->id, \
  33. G_STRFUNC, \
  34. __VA_ARGS__)
  35. #define msg_info_rrd(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \
  36. "rrd", file->id, \
  37. G_STRFUNC, \
  38. __VA_ARGS__)
  39. #define msg_debug_rrd(...) rspamd_conditional_debug_fast(NULL, NULL, \
  40. rspamd_rrd_log_id, "rrd", file->id, \
  41. G_STRFUNC, \
  42. __VA_ARGS__)
  43. INIT_LOG_MODULE(rrd)
  44. static GQuark
  45. rrd_error_quark(void)
  46. {
  47. return g_quark_from_static_string("rrd-error");
  48. }
  49. /**
  50. * Convert rrd dst type from string to numeric value
  51. */
  52. enum rrd_dst_type
  53. rrd_dst_from_string(const char *str)
  54. {
  55. if (g_ascii_strcasecmp(str, "counter") == 0) {
  56. return RRD_DST_COUNTER;
  57. }
  58. else if (g_ascii_strcasecmp(str, "absolute") == 0) {
  59. return RRD_DST_ABSOLUTE;
  60. }
  61. else if (g_ascii_strcasecmp(str, "gauge") == 0) {
  62. return RRD_DST_GAUGE;
  63. }
  64. else if (g_ascii_strcasecmp(str, "cdef") == 0) {
  65. return RRD_DST_CDEF;
  66. }
  67. else if (g_ascii_strcasecmp(str, "derive") == 0) {
  68. return RRD_DST_DERIVE;
  69. }
  70. return RRD_DST_INVALID;
  71. }
  72. /**
  73. * Convert numeric presentation of dst to string
  74. */
  75. const char *
  76. rrd_dst_to_string(enum rrd_dst_type type)
  77. {
  78. switch (type) {
  79. case RRD_DST_COUNTER:
  80. return "COUNTER";
  81. case RRD_DST_ABSOLUTE:
  82. return "ABSOLUTE";
  83. case RRD_DST_GAUGE:
  84. return "GAUGE";
  85. case RRD_DST_CDEF:
  86. return "CDEF";
  87. case RRD_DST_DERIVE:
  88. return "DERIVE";
  89. default:
  90. return "U";
  91. }
  92. return "U";
  93. }
  94. /**
  95. * Convert rrd consolidation function type from string to numeric value
  96. */
  97. enum rrd_cf_type
  98. rrd_cf_from_string(const char *str)
  99. {
  100. if (g_ascii_strcasecmp(str, "average") == 0) {
  101. return RRD_CF_AVERAGE;
  102. }
  103. else if (g_ascii_strcasecmp(str, "minimum") == 0) {
  104. return RRD_CF_MINIMUM;
  105. }
  106. else if (g_ascii_strcasecmp(str, "maximum") == 0) {
  107. return RRD_CF_MAXIMUM;
  108. }
  109. else if (g_ascii_strcasecmp(str, "last") == 0) {
  110. return RRD_CF_LAST;
  111. }
  112. /* XXX: add other CF functions supported by rrd */
  113. return RRD_CF_INVALID;
  114. }
  115. /**
  116. * Convert numeric presentation of cf to string
  117. */
  118. const char *
  119. rrd_cf_to_string(enum rrd_cf_type type)
  120. {
  121. switch (type) {
  122. case RRD_CF_AVERAGE:
  123. return "AVERAGE";
  124. case RRD_CF_MINIMUM:
  125. return "MINIMUM";
  126. case RRD_CF_MAXIMUM:
  127. return "MAXIMUM";
  128. case RRD_CF_LAST:
  129. return "LAST";
  130. default:
  131. return "U";
  132. }
  133. /* XXX: add other CF functions supported by rrd */
  134. return "U";
  135. }
  136. void rrd_make_default_rra(const char *cf_name,
  137. gulong pdp_cnt,
  138. gulong rows,
  139. struct rrd_rra_def *rra)
  140. {
  141. g_assert(cf_name != NULL);
  142. g_assert(rrd_cf_from_string(cf_name) != RRD_CF_INVALID);
  143. rra->pdp_cnt = pdp_cnt;
  144. rra->row_cnt = rows;
  145. rspamd_strlcpy(rra->cf_nam, cf_name, sizeof(rra->cf_nam));
  146. memset(rra->par, 0, sizeof(rra->par));
  147. rra->par[RRA_cdp_xff_val].dv = 0.5;
  148. }
  149. void rrd_make_default_ds(const char *name,
  150. const char *type,
  151. gulong pdp_step,
  152. struct rrd_ds_def *ds)
  153. {
  154. g_assert(name != NULL);
  155. g_assert(type != NULL);
  156. g_assert(rrd_dst_from_string(type) != RRD_DST_INVALID);
  157. rspamd_strlcpy(ds->ds_nam, name, sizeof(ds->ds_nam));
  158. rspamd_strlcpy(ds->dst, type, sizeof(ds->dst));
  159. memset(ds->par, 0, sizeof(ds->par));
  160. ds->par[RRD_DS_mrhb_cnt].lv = pdp_step * 2;
  161. ds->par[RRD_DS_min_val].dv = NAN;
  162. ds->par[RRD_DS_max_val].dv = NAN;
  163. }
  164. /**
  165. * Check rrd file for correctness (size, cookies, etc)
  166. */
  167. static gboolean
  168. rspamd_rrd_check_file(const char *filename, gboolean need_data, GError **err)
  169. {
  170. int fd, i;
  171. struct stat st;
  172. struct rrd_file_head head;
  173. struct rrd_rra_def rra;
  174. int head_size;
  175. fd = open(filename, O_RDWR);
  176. if (fd == -1) {
  177. g_set_error(err,
  178. rrd_error_quark(), errno, "rrd open error: %s", strerror(errno));
  179. return FALSE;
  180. }
  181. if (fstat(fd, &st) == -1) {
  182. g_set_error(err,
  183. rrd_error_quark(), errno, "rrd stat error: %s", strerror(errno));
  184. close(fd);
  185. return FALSE;
  186. }
  187. if (st.st_size < (goffset) sizeof(struct rrd_file_head)) {
  188. /* We have trimmed file */
  189. g_set_error(err, rrd_error_quark(), EINVAL, "rrd size is bad: %ud",
  190. (unsigned int) st.st_size);
  191. close(fd);
  192. return FALSE;
  193. }
  194. /* Try to read header */
  195. if (read(fd, &head, sizeof(head)) != sizeof(head)) {
  196. g_set_error(err,
  197. rrd_error_quark(), errno, "rrd read head error: %s",
  198. strerror(errno));
  199. close(fd);
  200. return FALSE;
  201. }
  202. /* Check magic */
  203. if (memcmp(head.version, RRD_VERSION, sizeof(head.version)) != 0) {
  204. g_set_error(err,
  205. rrd_error_quark(), EINVAL, "rrd head error: bad cookie");
  206. close(fd);
  207. return FALSE;
  208. }
  209. if (head.float_cookie != RRD_FLOAT_COOKIE) {
  210. g_set_error(err,
  211. rrd_error_quark(), EINVAL, "rrd head error: another architecture "
  212. "(file cookie %g != our cookie %g)",
  213. head.float_cookie, RRD_FLOAT_COOKIE);
  214. close(fd);
  215. return FALSE;
  216. }
  217. /* Check for other params */
  218. if (head.ds_cnt <= 0 || head.rra_cnt <= 0) {
  219. g_set_error(err,
  220. rrd_error_quark(), EINVAL, "rrd head cookies error: bad rra or ds count");
  221. close(fd);
  222. return FALSE;
  223. }
  224. /* Now we can calculate the overall size of rrd */
  225. head_size = sizeof(struct rrd_file_head) +
  226. sizeof(struct rrd_ds_def) * head.ds_cnt +
  227. sizeof(struct rrd_rra_def) * head.rra_cnt +
  228. sizeof(struct rrd_live_head) +
  229. sizeof(struct rrd_pdp_prep) * head.ds_cnt +
  230. sizeof(struct rrd_cdp_prep) * head.ds_cnt * head.rra_cnt +
  231. sizeof(struct rrd_rra_ptr) * head.rra_cnt;
  232. if (st.st_size < (goffset) head_size) {
  233. g_set_error(err,
  234. rrd_error_quark(), errno, "rrd file seems to have stripped header: %d",
  235. head_size);
  236. close(fd);
  237. return FALSE;
  238. }
  239. if (need_data) {
  240. /* Now check rra */
  241. if (lseek(fd, sizeof(struct rrd_ds_def) * head.ds_cnt,
  242. SEEK_CUR) == -1) {
  243. g_set_error(err,
  244. rrd_error_quark(), errno, "rrd head lseek error: %s",
  245. strerror(errno));
  246. close(fd);
  247. return FALSE;
  248. }
  249. for (i = 0; i < (int) head.rra_cnt; i++) {
  250. if (read(fd, &rra, sizeof(rra)) != sizeof(rra)) {
  251. g_set_error(err,
  252. rrd_error_quark(), errno, "rrd read rra error: %s",
  253. strerror(errno));
  254. close(fd);
  255. return FALSE;
  256. }
  257. head_size += rra.row_cnt * head.ds_cnt * sizeof(double);
  258. }
  259. if (st.st_size != head_size) {
  260. g_set_error(err,
  261. rrd_error_quark(), EINVAL, "rrd file seems to have incorrect size: %d, must be %d",
  262. (int) st.st_size, head_size);
  263. close(fd);
  264. return FALSE;
  265. }
  266. }
  267. close(fd);
  268. return TRUE;
  269. }
  270. /**
  271. * Adjust pointers in mmapped rrd file
  272. * @param file
  273. */
  274. static void
  275. rspamd_rrd_adjust_pointers(struct rspamd_rrd_file *file, gboolean completed)
  276. {
  277. uint8_t *ptr;
  278. ptr = file->map;
  279. file->stat_head = (struct rrd_file_head *) ptr;
  280. ptr += sizeof(struct rrd_file_head);
  281. file->ds_def = (struct rrd_ds_def *) ptr;
  282. ptr += sizeof(struct rrd_ds_def) * file->stat_head->ds_cnt;
  283. file->rra_def = (struct rrd_rra_def *) ptr;
  284. ptr += sizeof(struct rrd_rra_def) * file->stat_head->rra_cnt;
  285. file->live_head = (struct rrd_live_head *) ptr;
  286. ptr += sizeof(struct rrd_live_head);
  287. file->pdp_prep = (struct rrd_pdp_prep *) ptr;
  288. ptr += sizeof(struct rrd_pdp_prep) * file->stat_head->ds_cnt;
  289. file->cdp_prep = (struct rrd_cdp_prep *) ptr;
  290. ptr += sizeof(struct rrd_cdp_prep) * file->stat_head->rra_cnt *
  291. file->stat_head->ds_cnt;
  292. file->rra_ptr = (struct rrd_rra_ptr *) ptr;
  293. if (completed) {
  294. ptr += sizeof(struct rrd_rra_ptr) * file->stat_head->rra_cnt;
  295. file->rrd_value = (double *) ptr;
  296. }
  297. else {
  298. file->rrd_value = NULL;
  299. }
  300. }
  301. static void
  302. rspamd_rrd_calculate_checksum(struct rspamd_rrd_file *file)
  303. {
  304. unsigned char sigbuf[rspamd_cryptobox_HASHBYTES];
  305. struct rrd_ds_def *ds;
  306. unsigned int i;
  307. rspamd_cryptobox_hash_state_t st;
  308. if (file->finalized) {
  309. rspamd_cryptobox_hash_init(&st, NULL, 0);
  310. rspamd_cryptobox_hash_update(&st, file->filename, strlen(file->filename));
  311. for (i = 0; i < file->stat_head->ds_cnt; i++) {
  312. ds = &file->ds_def[i];
  313. rspamd_cryptobox_hash_update(&st, ds->ds_nam, sizeof(ds->ds_nam));
  314. }
  315. rspamd_cryptobox_hash_final(&st, sigbuf);
  316. file->id = rspamd_encode_base32(sigbuf, sizeof(sigbuf), RSPAMD_BASE32_DEFAULT);
  317. }
  318. }
  319. static int
  320. rspamd_rrd_open_exclusive(const char *filename)
  321. {
  322. struct timespec sleep_ts = {
  323. .tv_sec = 0,
  324. .tv_nsec = 1000000};
  325. int fd;
  326. fd = open(filename, O_RDWR);
  327. if (fd == -1) {
  328. return -1;
  329. }
  330. for (;;) {
  331. if (rspamd_file_lock(fd, TRUE) == -1) {
  332. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  333. nanosleep(&sleep_ts, NULL);
  334. continue;
  335. }
  336. else {
  337. close(fd);
  338. return -1;
  339. }
  340. }
  341. else {
  342. break;
  343. }
  344. }
  345. return fd;
  346. };
  347. /**
  348. * Open completed or incompleted rrd file
  349. * @param filename
  350. * @param completed
  351. * @param err
  352. * @return
  353. */
  354. static struct rspamd_rrd_file *
  355. rspamd_rrd_open_common(const char *filename, gboolean completed, GError **err)
  356. {
  357. struct rspamd_rrd_file *file;
  358. int fd;
  359. struct stat st;
  360. if (!rspamd_rrd_check_file(filename, completed, err)) {
  361. return NULL;
  362. }
  363. file = g_malloc0(sizeof(struct rspamd_rrd_file));
  364. /* Open file */
  365. fd = rspamd_rrd_open_exclusive(filename);
  366. if (fd == -1) {
  367. g_set_error(err,
  368. rrd_error_quark(), errno, "rrd open error: %s", strerror(errno));
  369. g_free(file);
  370. return FALSE;
  371. }
  372. if (fstat(fd, &st) == -1) {
  373. g_set_error(err,
  374. rrd_error_quark(), errno, "rrd stat error: %s", strerror(errno));
  375. rspamd_file_unlock(fd, FALSE);
  376. g_free(file);
  377. close(fd);
  378. return FALSE;
  379. }
  380. /* Mmap file */
  381. file->size = st.st_size;
  382. if ((file->map =
  383. mmap(NULL, st.st_size, PROT_READ | PROT_WRITE,
  384. MAP_SHARED, fd, 0)) == MAP_FAILED) {
  385. rspamd_file_unlock(fd, FALSE);
  386. close(fd);
  387. g_set_error(err,
  388. rrd_error_quark(), ENOMEM, "mmap failed: %s", strerror(errno));
  389. g_free(file);
  390. return NULL;
  391. }
  392. file->fd = fd;
  393. /* Adjust pointers */
  394. rspamd_rrd_adjust_pointers(file, completed);
  395. /* Mark it as finalized */
  396. file->finalized = completed;
  397. file->filename = g_strdup(filename);
  398. rspamd_rrd_calculate_checksum(file);
  399. return file;
  400. }
  401. /**
  402. * Open (and mmap) existing RRD file
  403. * @param filename path
  404. * @param err error pointer
  405. * @return rrd file structure
  406. */
  407. struct rspamd_rrd_file *
  408. rspamd_rrd_open(const char *filename, GError **err)
  409. {
  410. struct rspamd_rrd_file *file;
  411. if ((file = rspamd_rrd_open_common(filename, TRUE, err))) {
  412. msg_info_rrd("rrd file opened: %s", filename);
  413. }
  414. return file;
  415. }
  416. /**
  417. * Create basic header for rrd file
  418. * @param filename file path
  419. * @param ds_count number of data sources
  420. * @param rra_count number of round robin archives
  421. * @param pdp_step step of primary data points
  422. * @param err error pointer
  423. * @return TRUE if file has been created
  424. */
  425. struct rspamd_rrd_file *
  426. rspamd_rrd_create(const char *filename,
  427. gulong ds_count,
  428. gulong rra_count,
  429. gulong pdp_step,
  430. double initial_ticks,
  431. GError **err)
  432. {
  433. struct rspamd_rrd_file *new;
  434. struct rrd_file_head head;
  435. struct rrd_ds_def ds;
  436. struct rrd_rra_def rra;
  437. struct rrd_live_head lh;
  438. struct rrd_pdp_prep pdp;
  439. struct rrd_cdp_prep cdp;
  440. struct rrd_rra_ptr rra_ptr;
  441. int fd;
  442. unsigned int i, j;
  443. /* Open file */
  444. fd = open(filename, O_RDWR | O_CREAT | O_EXCL, 0644);
  445. if (fd == -1) {
  446. g_set_error(err,
  447. rrd_error_quark(), errno, "rrd create error: %s",
  448. strerror(errno));
  449. return NULL;
  450. }
  451. rspamd_file_lock(fd, FALSE);
  452. /* Fill header */
  453. memset(&head, 0, sizeof(head));
  454. head.rra_cnt = rra_count;
  455. head.ds_cnt = ds_count;
  456. head.pdp_step = pdp_step;
  457. memcpy(head.cookie, RRD_COOKIE, sizeof(head.cookie));
  458. memcpy(head.version, RRD_VERSION, sizeof(head.version));
  459. head.float_cookie = RRD_FLOAT_COOKIE;
  460. if (write(fd, &head, sizeof(head)) != sizeof(head)) {
  461. rspamd_file_unlock(fd, FALSE);
  462. close(fd);
  463. g_set_error(err,
  464. rrd_error_quark(), errno, "rrd write error: %s", strerror(errno));
  465. return NULL;
  466. }
  467. /* Fill DS section */
  468. memset(&ds, 0, sizeof(ds));
  469. memset(&ds.ds_nam, 0, sizeof(ds.ds_nam));
  470. memcpy(&ds.dst, "COUNTER", sizeof("COUNTER"));
  471. memset(&ds.par, 0, sizeof(ds.par));
  472. for (i = 0; i < ds_count; i++) {
  473. if (write(fd, &ds, sizeof(ds)) != sizeof(ds)) {
  474. rspamd_file_unlock(fd, FALSE);
  475. close(fd);
  476. g_set_error(err,
  477. rrd_error_quark(), errno, "rrd write error: %s",
  478. strerror(errno));
  479. return NULL;
  480. }
  481. }
  482. /* Fill RRA section */
  483. memset(&rra, 0, sizeof(rra));
  484. memcpy(&rra.cf_nam, "AVERAGE", sizeof("AVERAGE"));
  485. rra.pdp_cnt = 1;
  486. memset(&rra.par, 0, sizeof(rra.par));
  487. for (i = 0; i < rra_count; i++) {
  488. if (write(fd, &rra, sizeof(rra)) != sizeof(rra)) {
  489. rspamd_file_unlock(fd, FALSE);
  490. close(fd);
  491. g_set_error(err,
  492. rrd_error_quark(), errno, "rrd write error: %s",
  493. strerror(errno));
  494. return NULL;
  495. }
  496. }
  497. /* Fill live header */
  498. memset(&lh, 0, sizeof(lh));
  499. lh.last_up = (glong) initial_ticks;
  500. lh.last_up_usec = (glong) ((initial_ticks - lh.last_up) * 1e6f);
  501. if (write(fd, &lh, sizeof(lh)) != sizeof(lh)) {
  502. rspamd_file_unlock(fd, FALSE);
  503. close(fd);
  504. g_set_error(err,
  505. rrd_error_quark(), errno, "rrd write error: %s", strerror(errno));
  506. return NULL;
  507. }
  508. /* Fill pdp prep */
  509. memset(&pdp, 0, sizeof(pdp));
  510. memcpy(&pdp.last_ds, "U", sizeof("U"));
  511. memset(&pdp.scratch, 0, sizeof(pdp.scratch));
  512. pdp.scratch[PDP_val].dv = NAN;
  513. pdp.scratch[PDP_unkn_sec_cnt].lv = 0;
  514. for (i = 0; i < ds_count; i++) {
  515. if (write(fd, &pdp, sizeof(pdp)) != sizeof(pdp)) {
  516. rspamd_file_unlock(fd, FALSE);
  517. close(fd);
  518. g_set_error(err,
  519. rrd_error_quark(), errno, "rrd write error: %s",
  520. strerror(errno));
  521. return NULL;
  522. }
  523. }
  524. /* Fill cdp prep */
  525. memset(&cdp, 0, sizeof(cdp));
  526. memset(&cdp.scratch, 0, sizeof(cdp.scratch));
  527. cdp.scratch[CDP_val].dv = NAN;
  528. cdp.scratch[CDP_unkn_pdp_cnt].lv = 0;
  529. for (i = 0; i < rra_count; i++) {
  530. for (j = 0; j < ds_count; j++) {
  531. if (write(fd, &cdp, sizeof(cdp)) != sizeof(cdp)) {
  532. rspamd_file_unlock(fd, FALSE);
  533. close(fd);
  534. g_set_error(err,
  535. rrd_error_quark(), errno, "rrd write error: %s",
  536. strerror(errno));
  537. return NULL;
  538. }
  539. }
  540. }
  541. /* Set row pointers */
  542. memset(&rra_ptr, 0, sizeof(rra_ptr));
  543. for (i = 0; i < rra_count; i++) {
  544. if (write(fd, &rra_ptr, sizeof(rra_ptr)) != sizeof(rra_ptr)) {
  545. rspamd_file_unlock(fd, FALSE);
  546. close(fd);
  547. g_set_error(err,
  548. rrd_error_quark(), errno, "rrd write error: %s",
  549. strerror(errno));
  550. return NULL;
  551. }
  552. }
  553. rspamd_file_unlock(fd, FALSE);
  554. close(fd);
  555. new = rspamd_rrd_open_common(filename, FALSE, err);
  556. return new;
  557. }
  558. /**
  559. * Add data sources to rrd file
  560. * @param filename path to file
  561. * @param ds array of struct rrd_ds_def
  562. * @param err error pointer
  563. * @return TRUE if data sources were added
  564. */
  565. gboolean
  566. rspamd_rrd_add_ds(struct rspamd_rrd_file *file, GArray *ds, GError **err)
  567. {
  568. if (file == NULL || file->stat_head->ds_cnt * sizeof(struct rrd_ds_def) !=
  569. ds->len) {
  570. g_set_error(err,
  571. rrd_error_quark(), EINVAL, "rrd add ds failed: wrong arguments");
  572. return FALSE;
  573. }
  574. /* Straightforward memcpy */
  575. memcpy(file->ds_def, ds->data, ds->len);
  576. return TRUE;
  577. }
  578. /**
  579. * Add round robin archives to rrd file
  580. * @param filename path to file
  581. * @param ds array of struct rrd_rra_def
  582. * @param err error pointer
  583. * @return TRUE if archives were added
  584. */
  585. gboolean
  586. rspamd_rrd_add_rra(struct rspamd_rrd_file *file, GArray *rra, GError **err)
  587. {
  588. if (file == NULL || file->stat_head->rra_cnt *
  589. sizeof(struct rrd_rra_def) !=
  590. rra->len) {
  591. g_set_error(err,
  592. rrd_error_quark(), EINVAL, "rrd add rra failed: wrong arguments");
  593. return FALSE;
  594. }
  595. /* Straightforward memcpy */
  596. memcpy(file->rra_def, rra->data, rra->len);
  597. return TRUE;
  598. }
  599. /**
  600. * Finalize rrd file header and initialize all RRA in the file
  601. * @param filename file path
  602. * @param err error pointer
  603. * @return TRUE if rrd file is ready for use
  604. */
  605. gboolean
  606. rspamd_rrd_finalize(struct rspamd_rrd_file *file, GError **err)
  607. {
  608. int fd;
  609. unsigned int i;
  610. int count = 0;
  611. double vbuf[1024];
  612. struct stat st;
  613. if (file == NULL || file->filename == NULL || file->fd == -1) {
  614. g_set_error(err,
  615. rrd_error_quark(), EINVAL, "rrd add rra failed: wrong arguments");
  616. return FALSE;
  617. }
  618. fd = file->fd;
  619. if (lseek(fd, 0, SEEK_END) == -1) {
  620. g_set_error(err,
  621. rrd_error_quark(), errno, "rrd seek error: %s", strerror(errno));
  622. close(fd);
  623. return FALSE;
  624. }
  625. /* Adjust CDP */
  626. for (i = 0; i < file->stat_head->rra_cnt; i++) {
  627. file->cdp_prep->scratch[CDP_unkn_pdp_cnt].lv = 0;
  628. /* Randomize row pointer (disabled) */
  629. /* file->rra_ptr->cur_row = g_random_int () % file->rra_def[i].row_cnt; */
  630. file->rra_ptr->cur_row = file->rra_def[i].row_cnt - 1;
  631. /* Calculate values count */
  632. count += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
  633. }
  634. munmap(file->map, file->size);
  635. /* Write values */
  636. for (i = 0; i < G_N_ELEMENTS(vbuf); i++) {
  637. vbuf[i] = NAN;
  638. }
  639. while (count > 0) {
  640. /* Write values in buffered matter */
  641. if (write(fd, vbuf,
  642. MIN((int) G_N_ELEMENTS(vbuf), count) * sizeof(double)) == -1) {
  643. g_set_error(err,
  644. rrd_error_quark(), errno, "rrd write error: %s",
  645. strerror(errno));
  646. close(fd);
  647. return FALSE;
  648. }
  649. count -= G_N_ELEMENTS(vbuf);
  650. }
  651. if (fstat(fd, &st) == -1) {
  652. g_set_error(err,
  653. rrd_error_quark(), errno, "rrd stat error: %s", strerror(errno));
  654. close(fd);
  655. return FALSE;
  656. }
  657. /* Mmap again */
  658. file->size = st.st_size;
  659. if ((file->map =
  660. mmap(NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd,
  661. 0)) == MAP_FAILED) {
  662. close(fd);
  663. g_set_error(err,
  664. rrd_error_quark(), ENOMEM, "mmap failed: %s", strerror(errno));
  665. return FALSE;
  666. }
  667. /* Adjust pointers */
  668. rspamd_rrd_adjust_pointers(file, TRUE);
  669. file->finalized = TRUE;
  670. rspamd_rrd_calculate_checksum(file);
  671. msg_info_rrd("rrd file created: %s", file->filename);
  672. return TRUE;
  673. }
  674. /**
  675. * Update pdp_prep data
  676. * @param file rrd file
  677. * @param vals new values
  678. * @param pdp_new new pdp array
  679. * @param interval time elapsed from the last update
  680. * @return
  681. */
  682. static gboolean
  683. rspamd_rrd_update_pdp_prep(struct rspamd_rrd_file *file,
  684. double *vals,
  685. double *pdp_new,
  686. double interval)
  687. {
  688. unsigned int i;
  689. enum rrd_dst_type type;
  690. for (i = 0; i < file->stat_head->ds_cnt; i++) {
  691. type = rrd_dst_from_string(file->ds_def[i].dst);
  692. if (file->ds_def[i].par[RRD_DS_mrhb_cnt].lv < interval) {
  693. rspamd_strlcpy(file->pdp_prep[i].last_ds, "U",
  694. sizeof(file->pdp_prep[i].last_ds));
  695. pdp_new[i] = NAN;
  696. msg_debug_rrd("adding unknown point interval %.3f is less than heartbeat %l",
  697. interval, file->ds_def[i].par[RRD_DS_mrhb_cnt].lv);
  698. }
  699. else {
  700. switch (type) {
  701. case RRD_DST_COUNTER:
  702. case RRD_DST_DERIVE:
  703. if (file->pdp_prep[i].last_ds[0] == 'U') {
  704. pdp_new[i] = NAN;
  705. msg_debug_rrd("last point is NaN for point %ud", i);
  706. }
  707. else {
  708. pdp_new[i] = vals[i] - strtod(file->pdp_prep[i].last_ds,
  709. NULL);
  710. msg_debug_rrd("new PDP %ud, %.3f", i, pdp_new[i]);
  711. }
  712. break;
  713. case RRD_DST_GAUGE:
  714. pdp_new[i] = vals[i] * interval;
  715. msg_debug_rrd("new PDP %ud, %.3f", i, pdp_new[i]);
  716. break;
  717. case RRD_DST_ABSOLUTE:
  718. pdp_new[i] = vals[i];
  719. msg_debug_rrd("new PDP %ud, %.3f", i, pdp_new[i]);
  720. break;
  721. default:
  722. return FALSE;
  723. }
  724. }
  725. /* Copy value to the last_ds */
  726. if (!isnan(vals[i])) {
  727. rspamd_snprintf(file->pdp_prep[i].last_ds,
  728. sizeof(file->pdp_prep[i].last_ds), "%.4f", vals[i]);
  729. }
  730. else {
  731. file->pdp_prep[i].last_ds[0] = 'U';
  732. file->pdp_prep[i].last_ds[1] = '\0';
  733. }
  734. }
  735. return TRUE;
  736. }
  737. /**
  738. * Update step for this pdp
  739. * @param file
  740. * @param pdp_new new pdp array
  741. * @param pdp_temp temp pdp array
  742. * @param interval time till last update
  743. * @param pre_int pre interval
  744. * @param post_int post intervall
  745. * @param pdp_diff time till last pdp update
  746. */
  747. static void
  748. rspamd_rrd_update_pdp_step(struct rspamd_rrd_file *file,
  749. double *pdp_new,
  750. double *pdp_temp,
  751. double interval,
  752. gulong pdp_diff)
  753. {
  754. unsigned int i;
  755. rrd_value_t *scratch;
  756. gulong heartbeat;
  757. for (i = 0; i < file->stat_head->ds_cnt; i++) {
  758. scratch = file->pdp_prep[i].scratch;
  759. heartbeat = file->ds_def[i].par[RRD_DS_mrhb_cnt].lv;
  760. if (!isnan(pdp_new[i])) {
  761. if (isnan(scratch[PDP_val].dv)) {
  762. scratch[PDP_val].dv = 0;
  763. }
  764. }
  765. /* Check interval value for heartbeat for this DS */
  766. if ((interval > heartbeat) ||
  767. (file->stat_head->pdp_step / 2.0 < scratch[PDP_unkn_sec_cnt].lv)) {
  768. pdp_temp[i] = NAN;
  769. }
  770. else {
  771. pdp_temp[i] = scratch[PDP_val].dv /
  772. ((double) (pdp_diff - scratch[PDP_unkn_sec_cnt].lv));
  773. }
  774. if (isnan(pdp_new[i])) {
  775. scratch[PDP_unkn_sec_cnt].lv = interval;
  776. scratch[PDP_val].dv = NAN;
  777. }
  778. else {
  779. scratch[PDP_unkn_sec_cnt].lv = 0;
  780. scratch[PDP_val].dv = pdp_new[i] / interval;
  781. }
  782. msg_debug_rrd("new temp PDP %ud, %.3f -> %.3f, scratch: %3f",
  783. i, pdp_new[i], pdp_temp[i],
  784. scratch[PDP_val].dv);
  785. }
  786. }
  787. /**
  788. * Update CDP for this rra
  789. * @param file rrd file
  790. * @param pdp_steps how much pdp steps elapsed from the last update
  791. * @param pdp_offset offset from pdp
  792. * @param rra_steps how much steps must be updated for this rra
  793. * @param rra_index index of desired rra
  794. * @param pdp_temp temporary pdp points
  795. */
  796. static void
  797. rspamd_rrd_update_cdp(struct rspamd_rrd_file *file,
  798. double pdp_steps,
  799. double pdp_offset,
  800. gulong *rra_steps,
  801. gulong rra_index,
  802. double *pdp_temp)
  803. {
  804. unsigned int i;
  805. struct rrd_rra_def *rra;
  806. rrd_value_t *scratch;
  807. enum rrd_cf_type cf;
  808. double last_cdp = INFINITY, cur_cdp = INFINITY;
  809. gulong pdp_in_cdp;
  810. rra = &file->rra_def[rra_index];
  811. cf = rrd_cf_from_string(rra->cf_nam);
  812. /* Iterate over all DS for this RRA */
  813. for (i = 0; i < file->stat_head->ds_cnt; i++) {
  814. /* Get CDP for this RRA and DS */
  815. scratch =
  816. file->cdp_prep[rra_index * file->stat_head->ds_cnt + i].scratch;
  817. if (rra->pdp_cnt > 1) {
  818. /* Do we have any CDP to update for this rra ? */
  819. if (rra_steps[rra_index] > 0) {
  820. if (isnan(pdp_temp[i])) {
  821. /* New pdp is nan */
  822. /* Increment unknown points count */
  823. scratch[CDP_unkn_pdp_cnt].lv += pdp_offset;
  824. /* Reset secondary value */
  825. scratch[CDP_secondary_val].dv = NAN;
  826. }
  827. else {
  828. scratch[CDP_secondary_val].dv = pdp_temp[i];
  829. }
  830. /* Check XFF for this rra */
  831. if (scratch[CDP_unkn_pdp_cnt].lv > rra->pdp_cnt *
  832. rra->par[RRA_cdp_xff_val].lv) {
  833. /* XFF is reached */
  834. scratch[CDP_primary_val].dv = NAN;
  835. }
  836. else {
  837. /* Need to initialize CDP using specified consolidation */
  838. switch (cf) {
  839. case RRD_CF_AVERAGE:
  840. last_cdp =
  841. isnan(scratch[CDP_val].dv) ? 0.0 : scratch[CDP_val].dv;
  842. cur_cdp = isnan(pdp_temp[i]) ? 0.0 : pdp_temp[i];
  843. scratch[CDP_primary_val].dv =
  844. (last_cdp + cur_cdp *
  845. pdp_offset) /
  846. (rra->pdp_cnt - scratch[CDP_unkn_pdp_cnt].lv);
  847. break;
  848. case RRD_CF_MAXIMUM:
  849. last_cdp =
  850. isnan(scratch[CDP_val].dv) ? -INFINITY : scratch[CDP_val].dv;
  851. cur_cdp = isnan(pdp_temp[i]) ? -INFINITY : pdp_temp[i];
  852. scratch[CDP_primary_val].dv = MAX(last_cdp, cur_cdp);
  853. break;
  854. case RRD_CF_MINIMUM:
  855. last_cdp =
  856. isnan(scratch[CDP_val].dv) ? INFINITY : scratch[CDP_val].dv;
  857. cur_cdp = isnan(pdp_temp[i]) ? INFINITY : pdp_temp[i];
  858. scratch[CDP_primary_val].dv = MIN(last_cdp, cur_cdp);
  859. break;
  860. case RRD_CF_LAST:
  861. default:
  862. scratch[CDP_primary_val].dv = pdp_temp[i];
  863. last_cdp = INFINITY;
  864. break;
  865. }
  866. }
  867. /* Init carry of this CDP */
  868. pdp_in_cdp = (pdp_steps - pdp_offset) / rra->pdp_cnt;
  869. if (pdp_in_cdp == 0 || isnan(pdp_temp[i])) {
  870. /* Set overflow */
  871. switch (cf) {
  872. case RRD_CF_AVERAGE:
  873. scratch[CDP_val].dv = 0;
  874. break;
  875. case RRD_CF_MAXIMUM:
  876. scratch[CDP_val].dv = -INFINITY;
  877. break;
  878. case RRD_CF_MINIMUM:
  879. scratch[CDP_val].dv = INFINITY;
  880. break;
  881. default:
  882. scratch[CDP_val].dv = NAN;
  883. break;
  884. }
  885. }
  886. else {
  887. /* Special carry for average */
  888. if (cf == RRD_CF_AVERAGE) {
  889. scratch[CDP_val].dv = pdp_temp[i] * pdp_in_cdp;
  890. }
  891. else {
  892. scratch[CDP_val].dv = pdp_temp[i];
  893. }
  894. }
  895. scratch[CDP_unkn_pdp_cnt].lv = 0;
  896. msg_debug_rrd("update cdp for DS %d with value %.3f, "
  897. "stored value: %.3f, carry: %.3f",
  898. i, last_cdp,
  899. scratch[CDP_primary_val].dv, scratch[CDP_val].dv);
  900. }
  901. /* In this case we just need to update cdp_prep for this RRA */
  902. else {
  903. if (isnan(pdp_temp[i])) {
  904. /* Just increase undefined zone */
  905. scratch[CDP_unkn_pdp_cnt].lv += pdp_steps;
  906. }
  907. else {
  908. /* Calculate cdp value */
  909. last_cdp = scratch[CDP_val].dv;
  910. switch (cf) {
  911. case RRD_CF_AVERAGE:
  912. if (isnan(last_cdp)) {
  913. scratch[CDP_val].dv = pdp_temp[i] * pdp_steps;
  914. }
  915. else {
  916. scratch[CDP_val].dv = last_cdp + pdp_temp[i] *
  917. pdp_steps;
  918. }
  919. break;
  920. case RRD_CF_MAXIMUM:
  921. scratch[CDP_val].dv = MAX(last_cdp, pdp_temp[i]);
  922. break;
  923. case RRD_CF_MINIMUM:
  924. scratch[CDP_val].dv = MIN(last_cdp, pdp_temp[i]);
  925. break;
  926. case RRD_CF_LAST:
  927. scratch[CDP_val].dv = pdp_temp[i];
  928. break;
  929. default:
  930. scratch[CDP_val].dv = NAN;
  931. break;
  932. }
  933. }
  934. msg_debug_rrd("aggregate cdp %d with pdp %.3f, "
  935. "stored value: %.3f",
  936. i, pdp_temp[i], scratch[CDP_val].dv);
  937. }
  938. }
  939. else {
  940. /* We have nothing to consolidate, but we may miss some pdp */
  941. if (pdp_steps > 2) {
  942. /* Just write PDP value */
  943. scratch[CDP_primary_val].dv = pdp_temp[i];
  944. scratch[CDP_secondary_val].dv = pdp_temp[i];
  945. }
  946. }
  947. }
  948. }
  949. /**
  950. * Update RRA in a file
  951. * @param file rrd file
  952. * @param rra_steps steps for each rra
  953. * @param now current time
  954. */
  955. void rspamd_rrd_write_rra(struct rspamd_rrd_file *file, gulong *rra_steps)
  956. {
  957. unsigned int i, j, ds_cnt;
  958. struct rrd_rra_def *rra;
  959. struct rrd_cdp_prep *cdp;
  960. double *rra_row = file->rrd_value, *cur_row;
  961. ds_cnt = file->stat_head->ds_cnt;
  962. /* Iterate over all RRA */
  963. for (i = 0; i < file->stat_head->rra_cnt; i++) {
  964. rra = &file->rra_def[i];
  965. if (rra_steps[i] > 0) {
  966. /* Move row ptr */
  967. if (++file->rra_ptr[i].cur_row >= rra->row_cnt) {
  968. file->rra_ptr[i].cur_row = 0;
  969. }
  970. /* Calculate seek */
  971. cdp = &file->cdp_prep[ds_cnt * i];
  972. cur_row = rra_row + ds_cnt * file->rra_ptr[i].cur_row;
  973. /* Iterate over DS */
  974. for (j = 0; j < ds_cnt; j++) {
  975. cur_row[j] = cdp[j].scratch[CDP_primary_val].dv;
  976. msg_debug_rrd("write cdp %d: %.3f", j, cur_row[j]);
  977. }
  978. }
  979. rra_row += rra->row_cnt * ds_cnt;
  980. }
  981. }
  982. /**
  983. * Add record to rrd file
  984. * @param file rrd file object
  985. * @param points points (must be row suitable for this RRA, depending on ds count)
  986. * @param err error pointer
  987. * @return TRUE if a row has been added
  988. */
  989. gboolean
  990. rspamd_rrd_add_record(struct rspamd_rrd_file *file,
  991. GArray *points,
  992. double ticks,
  993. GError **err)
  994. {
  995. double interval, *pdp_new, *pdp_temp;
  996. unsigned int i;
  997. glong seconds, microseconds;
  998. gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step,
  999. prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset;
  1000. if (file == NULL || file->stat_head->ds_cnt * sizeof(double) !=
  1001. points->len) {
  1002. g_set_error(err,
  1003. rrd_error_quark(), EINVAL,
  1004. "rrd add points failed: wrong arguments");
  1005. return FALSE;
  1006. }
  1007. /* Get interval */
  1008. seconds = (glong) ticks;
  1009. microseconds = (glong) ((ticks - seconds) * 1000000.);
  1010. interval = ticks - ((double) file->live_head->last_up +
  1011. file->live_head->last_up_usec / 1000000.);
  1012. msg_debug_rrd("update rrd record after %.3f seconds", interval);
  1013. /* Update PDP preparation values */
  1014. pdp_new = g_malloc0(sizeof(double) * file->stat_head->ds_cnt);
  1015. pdp_temp = g_malloc0(sizeof(double) * file->stat_head->ds_cnt);
  1016. /* How much steps need to be updated in each RRA */
  1017. rra_steps = g_malloc0(sizeof(gulong) * file->stat_head->rra_cnt);
  1018. if (!rspamd_rrd_update_pdp_prep(file, (double *) points->data, pdp_new,
  1019. interval)) {
  1020. g_set_error(err,
  1021. rrd_error_quark(), EINVAL,
  1022. "rrd update pdp failed: wrong arguments");
  1023. g_free(pdp_new);
  1024. g_free(pdp_temp);
  1025. g_free(rra_steps);
  1026. return FALSE;
  1027. }
  1028. /* Calculate elapsed steps */
  1029. /* Age in seconds for previous pdp store */
  1030. prev_pdp_age = file->live_head->last_up % file->stat_head->pdp_step;
  1031. /* Time in seconds for last pdp update */
  1032. prev_pdp_step = file->live_head->last_up - prev_pdp_age;
  1033. /* Age in seconds from current time to required pdp time */
  1034. cur_pdp_age = seconds % file->stat_head->pdp_step;
  1035. /* Time of desired pdp step */
  1036. cur_pdp_step = seconds - cur_pdp_age;
  1037. cur_pdp_count = cur_pdp_step / file->stat_head->pdp_step;
  1038. pdp_steps = (cur_pdp_step - prev_pdp_step) / file->stat_head->pdp_step;
  1039. if (pdp_steps == 0) {
  1040. /* Simple update of pdp prep */
  1041. for (i = 0; i < file->stat_head->ds_cnt; i++) {
  1042. if (isnan(pdp_new[i])) {
  1043. /* Increment unknown period */
  1044. file->pdp_prep[i].scratch[PDP_unkn_sec_cnt].lv += floor(
  1045. interval);
  1046. }
  1047. else {
  1048. if (isnan(file->pdp_prep[i].scratch[PDP_val].dv)) {
  1049. /* Reset pdp to the current value */
  1050. file->pdp_prep[i].scratch[PDP_val].dv = pdp_new[i];
  1051. }
  1052. else {
  1053. /* Increment pdp value */
  1054. file->pdp_prep[i].scratch[PDP_val].dv += pdp_new[i];
  1055. }
  1056. }
  1057. }
  1058. }
  1059. else {
  1060. /* Complex update of PDP, CDP and RRA */
  1061. /* Update PDP for this step */
  1062. rspamd_rrd_update_pdp_step(file,
  1063. pdp_new,
  1064. pdp_temp,
  1065. interval,
  1066. pdp_steps * file->stat_head->pdp_step);
  1067. /* Update CDP points for each RRA*/
  1068. for (i = 0; i < file->stat_head->rra_cnt; i++) {
  1069. /* Calculate pdp offset for this RRA */
  1070. pdp_offset = file->rra_def[i].pdp_cnt - cur_pdp_count %
  1071. file->rra_def[i].pdp_cnt;
  1072. /* How much steps we got for this RRA */
  1073. if (pdp_offset <= pdp_steps) {
  1074. rra_steps[i] =
  1075. (pdp_steps - pdp_offset) / file->rra_def[i].pdp_cnt + 1;
  1076. }
  1077. else {
  1078. /* This rra have not passed enough pdp steps */
  1079. rra_steps[i] = 0;
  1080. }
  1081. msg_debug_rrd("cdp: %ud, rra steps: %ul(%ul), pdp steps: %ul",
  1082. i, rra_steps[i], pdp_offset, pdp_steps);
  1083. /* Update this specific CDP */
  1084. rspamd_rrd_update_cdp(file,
  1085. pdp_steps,
  1086. pdp_offset,
  1087. rra_steps,
  1088. i,
  1089. pdp_temp);
  1090. }
  1091. /* Write RRA */
  1092. rspamd_rrd_write_rra(file, rra_steps);
  1093. }
  1094. file->live_head->last_up = seconds;
  1095. file->live_head->last_up_usec = microseconds;
  1096. /* Sync and invalidate */
  1097. msync(file->map, file->size, MS_ASYNC | MS_INVALIDATE);
  1098. g_free(pdp_new);
  1099. g_free(pdp_temp);
  1100. g_free(rra_steps);
  1101. return TRUE;
  1102. }
  1103. /**
  1104. * Close rrd file
  1105. * @param file
  1106. * @return
  1107. */
  1108. int rspamd_rrd_close(struct rspamd_rrd_file *file)
  1109. {
  1110. if (file == NULL) {
  1111. errno = EINVAL;
  1112. return -1;
  1113. }
  1114. munmap(file->map, file->size);
  1115. close(file->fd);
  1116. g_free(file->filename);
  1117. g_free(file->id);
  1118. g_free(file);
  1119. return 0;
  1120. }
  1121. static struct rspamd_rrd_file *
  1122. rspamd_rrd_create_file(const char *path, gboolean finalize, GError **err)
  1123. {
  1124. struct rspamd_rrd_file *file;
  1125. struct rrd_ds_def ds[RSPAMD_RRD_DS_COUNT];
  1126. struct rrd_rra_def rra[RSPAMD_RRD_RRA_COUNT];
  1127. int i;
  1128. GArray ar;
  1129. /* Try to create new rrd file */
  1130. file = rspamd_rrd_create(path, RSPAMD_RRD_DS_COUNT, RSPAMD_RRD_RRA_COUNT,
  1131. 1, rspamd_get_calendar_ticks(), err);
  1132. if (file == NULL) {
  1133. return NULL;
  1134. }
  1135. /* Create DS and RRA */
  1136. for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i++) {
  1137. rrd_make_default_ds(rspamd_action_to_str(i),
  1138. rrd_dst_to_string(RRD_DST_COUNTER), 1, &ds[i]);
  1139. }
  1140. ar.data = (char *) ds;
  1141. ar.len = sizeof(ds);
  1142. if (!rspamd_rrd_add_ds(file, &ar, err)) {
  1143. rspamd_rrd_close(file);
  1144. return NULL;
  1145. }
  1146. /* Once per minute for 1 day */
  1147. rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE),
  1148. 60, 24 * 60, &rra[0]);
  1149. /* Once per 5 minutes for 1 week */
  1150. rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE),
  1151. 5 * 60, 7 * 24 * 60 / 5, &rra[1]);
  1152. /* Once per 10 mins for 1 month */
  1153. rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE),
  1154. 60 * 10, 30 * 24 * 6, &rra[2]);
  1155. /* Once per hour for 1 year */
  1156. rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE),
  1157. 60 * 60, 365 * 24, &rra[3]);
  1158. ar.data = (char *) rra;
  1159. ar.len = sizeof(rra);
  1160. if (!rspamd_rrd_add_rra(file, &ar, err)) {
  1161. rspamd_rrd_close(file);
  1162. return NULL;
  1163. }
  1164. if (finalize && !rspamd_rrd_finalize(file, err)) {
  1165. rspamd_rrd_close(file);
  1166. return NULL;
  1167. }
  1168. return file;
  1169. }
  1170. static void
  1171. rspamd_rrd_convert_ds(struct rspamd_rrd_file *old,
  1172. struct rspamd_rrd_file *cur, int idx_old, int idx_new)
  1173. {
  1174. struct rrd_pdp_prep *pdp_prep_old, *pdp_prep_new;
  1175. struct rrd_cdp_prep *cdp_prep_old, *cdp_prep_new;
  1176. double *val_old, *val_new;
  1177. gulong rra_cnt, i, j, points_cnt, old_ds, new_ds;
  1178. rra_cnt = old->stat_head->rra_cnt;
  1179. pdp_prep_old = &old->pdp_prep[idx_old];
  1180. pdp_prep_new = &cur->pdp_prep[idx_new];
  1181. memcpy(pdp_prep_new, pdp_prep_old, sizeof(*pdp_prep_new));
  1182. val_old = old->rrd_value;
  1183. val_new = cur->rrd_value;
  1184. old_ds = old->stat_head->ds_cnt;
  1185. new_ds = cur->stat_head->ds_cnt;
  1186. for (i = 0; i < rra_cnt; i++) {
  1187. cdp_prep_old = &old->cdp_prep[i * old_ds] + idx_old;
  1188. cdp_prep_new = &cur->cdp_prep[i * new_ds] + idx_new;
  1189. memcpy(cdp_prep_new, cdp_prep_old, sizeof(*cdp_prep_new));
  1190. points_cnt = old->rra_def[i].row_cnt;
  1191. for (j = 0; j < points_cnt; j++) {
  1192. val_new[j * new_ds + idx_new] = val_old[j * old_ds + idx_old];
  1193. }
  1194. val_new += points_cnt * new_ds;
  1195. val_old += points_cnt * old_ds;
  1196. }
  1197. }
  1198. static struct rspamd_rrd_file *
  1199. rspamd_rrd_convert(const char *path, struct rspamd_rrd_file *old,
  1200. GError **err)
  1201. {
  1202. struct rspamd_rrd_file *rrd;
  1203. char tpath[PATH_MAX];
  1204. g_assert(old != NULL);
  1205. rspamd_snprintf(tpath, sizeof(tpath), "%s.new", path);
  1206. rrd = rspamd_rrd_create_file(tpath, TRUE, err);
  1207. if (rrd) {
  1208. /* Copy old data */
  1209. memcpy(rrd->live_head, old->live_head, sizeof(*rrd->live_head));
  1210. memcpy(rrd->rra_ptr, old->rra_ptr,
  1211. sizeof(*old->rra_ptr) * rrd->stat_head->rra_cnt);
  1212. /*
  1213. * Old DSes:
  1214. * 0 - spam -> reject
  1215. * 1 - probable spam -> add header
  1216. * 2 - greylist -> greylist
  1217. * 3 - ham -> ham
  1218. */
  1219. rspamd_rrd_convert_ds(old, rrd, 0, METRIC_ACTION_REJECT);
  1220. rspamd_rrd_convert_ds(old, rrd, 1, METRIC_ACTION_ADD_HEADER);
  1221. rspamd_rrd_convert_ds(old, rrd, 2, METRIC_ACTION_GREYLIST);
  1222. rspamd_rrd_convert_ds(old, rrd, 3, METRIC_ACTION_NOACTION);
  1223. if (unlink(path) == -1) {
  1224. g_set_error(err, rrd_error_quark(), errno, "cannot unlink old rrd file %s: %s",
  1225. path, strerror(errno));
  1226. unlink(tpath);
  1227. rspamd_rrd_close(rrd);
  1228. return NULL;
  1229. }
  1230. if (rename(tpath, path) == -1) {
  1231. g_set_error(err, rrd_error_quark(), errno, "cannot rename old rrd file %s: %s",
  1232. path, strerror(errno));
  1233. unlink(tpath);
  1234. rspamd_rrd_close(rrd);
  1235. return NULL;
  1236. }
  1237. }
  1238. return rrd;
  1239. }
  1240. struct rspamd_rrd_file *
  1241. rspamd_rrd_file_default(const char *path,
  1242. GError **err)
  1243. {
  1244. struct rspamd_rrd_file *file, *nf;
  1245. g_assert(path != NULL);
  1246. if (access(path, R_OK) != -1) {
  1247. /* We can open rrd file */
  1248. file = rspamd_rrd_open(path, err);
  1249. if (file == NULL) {
  1250. return NULL;
  1251. }
  1252. if (file->stat_head->rra_cnt != RSPAMD_RRD_RRA_COUNT) {
  1253. msg_err_rrd("rrd file is not suitable for rspamd: it has "
  1254. "%ul ds and %ul rra",
  1255. file->stat_head->ds_cnt,
  1256. file->stat_head->rra_cnt);
  1257. g_set_error(err, rrd_error_quark(), EINVAL, "bad rrd file");
  1258. rspamd_rrd_close(file);
  1259. return NULL;
  1260. }
  1261. else if (file->stat_head->ds_cnt == RSPAMD_RRD_OLD_DS_COUNT) {
  1262. /* Old rrd, need to convert */
  1263. msg_info_rrd("rrd file %s is not suitable for rspamd, convert it",
  1264. path);
  1265. nf = rspamd_rrd_convert(path, file, err);
  1266. rspamd_rrd_close(file);
  1267. return nf;
  1268. }
  1269. else if (file->stat_head->ds_cnt == RSPAMD_RRD_DS_COUNT) {
  1270. return file;
  1271. }
  1272. else {
  1273. msg_err_rrd("rrd file is not suitable for rspamd: it has "
  1274. "%ul ds and %ul rra",
  1275. file->stat_head->ds_cnt,
  1276. file->stat_head->rra_cnt);
  1277. g_set_error(err, rrd_error_quark(), EINVAL, "bad rrd file");
  1278. rspamd_rrd_close(file);
  1279. return NULL;
  1280. }
  1281. }
  1282. file = rspamd_rrd_create_file(path, TRUE, err);
  1283. return file;
  1284. }
  1285. struct rspamd_rrd_query_result *
  1286. rspamd_rrd_query(struct rspamd_rrd_file *file,
  1287. gulong rra_num)
  1288. {
  1289. struct rspamd_rrd_query_result *res;
  1290. struct rrd_rra_def *rra;
  1291. const double *rra_offset = NULL;
  1292. unsigned int i;
  1293. g_assert(file != NULL);
  1294. if (rra_num > file->stat_head->rra_cnt) {
  1295. msg_err_rrd("requested unexisting rra: %l", rra_num);
  1296. return NULL;
  1297. }
  1298. res = g_malloc0(sizeof(*res));
  1299. res->ds_count = file->stat_head->ds_cnt;
  1300. res->last_update = (double) file->live_head->last_up +
  1301. ((double) file->live_head->last_up_usec / 1e6f);
  1302. res->pdp_per_cdp = file->rra_def[rra_num].pdp_cnt;
  1303. res->rra_rows = file->rra_def[rra_num].row_cnt;
  1304. rra_offset = file->rrd_value;
  1305. for (i = 0; i < file->stat_head->rra_cnt; i++) {
  1306. rra = &file->rra_def[i];
  1307. if (i == rra_num) {
  1308. res->cur_row = file->rra_ptr[i].cur_row % rra->row_cnt;
  1309. break;
  1310. }
  1311. rra_offset += rra->row_cnt * res->ds_count;
  1312. }
  1313. res->data = rra_offset;
  1314. return res;
  1315. }