aboutsummaryrefslogtreecommitdiffstats
path: root/src/rrd.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-04-21 16:25:51 +0100
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-04-21 16:25:51 +0100
commit61555065f3d1c8badcc9573691232f1b6e42988c (patch)
tree563d5b7cb8c468530f7e79c4da0a75267b1184e1 /src/rrd.c
parentad5bf825b7f33bc10311673991f0cc888e69c0b1 (diff)
downloadrspamd-61555065f3d1c8badcc9573691232f1b6e42988c.tar.gz
rspamd-61555065f3d1c8badcc9573691232f1b6e42988c.zip
Rework project structure, remove trash files.
Diffstat (limited to 'src/rrd.c')
-rw-r--r--src/rrd.c1015
1 files changed, 0 insertions, 1015 deletions
diff --git a/src/rrd.c b/src/rrd.c
deleted file mode 100644
index a0e21eaed..000000000
--- a/src/rrd.c
+++ /dev/null
@@ -1,1015 +0,0 @@
-/* Copyright (c) 2010-2012, Vsevolod Stakhov
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
- * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
- * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "config.h"
-#include "rrd.h"
-#include "util.h"
-
-static GQuark
-rrd_error_quark (void)
-{
- return g_quark_from_static_string ("rrd-error");
-}
-
-/**
- * Convert rrd dst type from string to numeric value
- */
-enum rrd_dst_type
-rrd_dst_from_string (const gchar *str)
-{
- if (g_ascii_strcasecmp (str, "counter") == 0) {
- return RRD_DST_COUNTER;
- }
- else if (g_ascii_strcasecmp (str, "absolute") == 0) {
- return RRD_DST_ABSOLUTE;
- }
- else if (g_ascii_strcasecmp (str, "gauge") == 0) {
- return RRD_DST_GAUGE;
- }
- else if (g_ascii_strcasecmp (str, "cdef") == 0) {
- return RRD_DST_CDEF;
- }
- else if (g_ascii_strcasecmp (str, "derive") == 0) {
- return RRD_DST_DERIVE;
- }
- return -1;
-}
-
-/**
- * Convert numeric presentation of dst to string
- */
-const gchar*
-rrd_dst_to_string (enum rrd_dst_type type)
-{
- switch (type) {
- case RRD_DST_COUNTER:
- return "COUNTER";
- case RRD_DST_ABSOLUTE:
- return "ABSOLUTE";
- case RRD_DST_GAUGE:
- return "GAUGE";
- case RRD_DST_CDEF:
- return "CDEF";
- case RRD_DST_DERIVE:
- return "DERIVE";
- default:
- return "U";
- }
-
- return "U";
-}
-
-/**
- * Convert rrd consolidation function type from string to numeric value
- */
-enum rrd_cf_type
-rrd_cf_from_string (const gchar *str)
-{
- if (g_ascii_strcasecmp (str, "average") == 0) {
- return RRD_CF_AVERAGE;
- }
- else if (g_ascii_strcasecmp (str, "minimum") == 0) {
- return RRD_CF_MINIMUM;
- }
- else if (g_ascii_strcasecmp (str, "maximum") == 0) {
- return RRD_CF_MAXIMUM;
- }
- else if (g_ascii_strcasecmp (str, "last") == 0) {
- return RRD_CF_LAST;
- }
- /* XXX: add other CF functions supported by rrd */
-
- return -1;
-}
-
-/**
- * Convert numeric presentation of cf to string
- */
-const gchar*
-rrd_cf_to_string (enum rrd_cf_type type)
-{
- switch (type) {
- case RRD_CF_AVERAGE:
- return "AVERAGE";
- case RRD_CF_MINIMUM:
- return "MINIMUM";
- case RRD_CF_MAXIMUM:
- return "MAXIMUM";
- case RRD_CF_LAST:
- return "LAST";
- default:
- return "U";
- }
-
- /* XXX: add other CF functions supported by rrd */
-
- return "U";
-}
-
-void
-rrd_make_default_rra (const gchar *cf_name, gulong pdp_cnt, gulong rows, struct rrd_rra_def *rra)
-{
- rra->pdp_cnt = pdp_cnt;
- rra->row_cnt = rows;
- rspamd_strlcpy (rra->cf_nam, cf_name, sizeof (rra->cf_nam));
- memset (rra->par, 0, sizeof (rra->par));
- rra->par[RRA_cdp_xff_val].dv = 0.5;
-}
-
-void
-rrd_make_default_ds (const gchar *name, gulong pdp_step, struct rrd_ds_def *ds)
-{
- rspamd_strlcpy (ds->ds_nam, name, sizeof (ds->ds_nam));
- rspamd_strlcpy (ds->dst, "COUNTER", sizeof (ds->dst));
- memset (ds->par, 0, sizeof (ds->par));
- ds->par[RRD_DS_mrhb_cnt].lv = pdp_step * 2;
- ds->par[RRD_DS_min_val].dv = NAN;
- ds->par[RRD_DS_max_val].dv = NAN;
-}
-
-/**
- * Check rrd file for correctness (size, cookies, etc)
- */
-static gboolean
-rspamd_rrd_check_file (const gchar *filename, gboolean need_data, GError **err)
-{
- gint fd, i;
- struct stat st;
- struct rrd_file_head head;
- struct rrd_rra_def rra;
- gint head_size;
-
- fd = open (filename, O_RDWR);
- if (fd == -1) {
- g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
- return FALSE;
- }
-
- if (fstat (fd, &st) == -1) {
- g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
- close (fd);
- return FALSE;
- }
- if (st.st_size < (goffset)sizeof (struct rrd_file_head)) {
- /* We have trimmed file */
- g_set_error (err, rrd_error_quark (), EINVAL, "rrd size is bad: %ud", (guint)st.st_size);
- close (fd);
- return FALSE;
- }
-
- /* Try to read header */
- if (read (fd, &head, sizeof (head)) != sizeof (head)) {
- g_set_error (err, rrd_error_quark (), errno, "rrd read head error: %s", strerror (errno));
- close (fd);
- return FALSE;
- }
- /* Check magic */
- if (memcmp (head.cookie, RRD_COOKIE, sizeof (head.cookie)) != 0 ||
- memcmp (head.version, RRD_VERSION, sizeof (head.version)) != 0 ||
- head.float_cookie != RRD_FLOAT_COOKIE) {
- g_set_error (err, rrd_error_quark (), EINVAL, "rrd head cookies error: %s", strerror (errno));
- close (fd);
- return FALSE;
- }
- /* Check for other params */
- if (head.ds_cnt <= 0 || head.rra_cnt <= 0) {
- g_set_error (err, rrd_error_quark (), EINVAL, "rrd head cookies error: %s", strerror (errno));
- close (fd);
- return FALSE;
- }
- /* Now we can calculate the overall size of rrd */
- head_size = sizeof (struct rrd_file_head) +
- sizeof (struct rrd_ds_def) * head.ds_cnt +
- sizeof (struct rrd_rra_def) * head.rra_cnt +
- sizeof (struct rrd_live_head) +
- sizeof (struct rrd_pdp_prep) * head.ds_cnt +
- sizeof (struct rrd_cdp_prep) * head.ds_cnt * head.rra_cnt +
- sizeof (struct rrd_rra_ptr) * head.rra_cnt;
- if (st.st_size < (goffset)head_size) {
- g_set_error (err, rrd_error_quark (), errno, "rrd file seems to have stripped header: %d", head_size);
- close (fd);
- return FALSE;
- }
-
- if (need_data) {
- /* Now check rra */
- if (lseek (fd, sizeof (struct rrd_ds_def) * head.ds_cnt, SEEK_CUR) == -1) {
- g_set_error (err, rrd_error_quark (), errno, "rrd head lseek error: %s", strerror (errno));
- close (fd);
- return FALSE;
- }
- for (i = 0; i < (gint)head.rra_cnt; i ++) {
- if (read (fd, &rra, sizeof (rra)) != sizeof (rra)) {
- g_set_error (err, rrd_error_quark (), errno, "rrd read rra error: %s", strerror (errno));
- close (fd);
- return FALSE;
- }
- head_size += rra.row_cnt * head.ds_cnt * sizeof (gdouble);
- }
-
- if (st.st_size != head_size) {
- g_set_error (err, rrd_error_quark (), EINVAL, "rrd file seems to have incorrect size: %d, must be %d", (gint)st.st_size, head_size);
- close (fd);
- return FALSE;
- }
- }
-
- close (fd);
- return TRUE;
-}
-
-/**
- * Adjust pointers in mmapped rrd file
- * @param file
- */
-static void
-rspamd_rrd_adjust_pointers (struct rspamd_rrd_file *file, gboolean completed)
-{
- guint8 *ptr;
-
- ptr = file->map;
- file->stat_head = (struct rrd_file_head *)ptr;
- ptr += sizeof (struct rrd_file_head);
- file->ds_def = (struct rrd_ds_def *)ptr;
- ptr += sizeof (struct rrd_ds_def) * file->stat_head->ds_cnt;
- file->rra_def = (struct rrd_rra_def *)ptr;
- ptr += sizeof (struct rrd_rra_def) * file->stat_head->rra_cnt;
- file->live_head = (struct rrd_live_head *)ptr;
- ptr += sizeof (struct rrd_live_head);
- file->pdp_prep = (struct rrd_pdp_prep *)ptr;
- ptr += sizeof (struct rrd_pdp_prep) * file->stat_head->ds_cnt;
- file->cdp_prep = (struct rrd_cdp_prep *)ptr;
- ptr += sizeof (struct rrd_cdp_prep) * file->stat_head->rra_cnt * file->stat_head->ds_cnt;
- file->rra_ptr = (struct rrd_rra_ptr *)ptr;
- if (completed) {
- ptr += sizeof (struct rrd_rra_ptr) * file->stat_head->rra_cnt;
- file->rrd_value = (gdouble *)ptr;
- }
- else {
- file->rrd_value = NULL;
- }
-}
-
-/**
- * Open completed or incompleted rrd file
- * @param filename
- * @param completed
- * @param err
- * @return
- */
-static struct rspamd_rrd_file*
-rspamd_rrd_open_common (const gchar *filename, gboolean completed, GError **err)
-{
- struct rspamd_rrd_file *new;
- gint fd;
- struct stat st;
-
- if (!rspamd_rrd_check_file (filename, completed, err)) {
- return NULL;
- }
-
- new = g_slice_alloc0 (sizeof (struct rspamd_rrd_file));
-
- if (new == NULL) {
- g_set_error (err, rrd_error_quark (), ENOMEM, "not enough memory");
- return NULL;
- }
-
- /* Open file */
- fd = open (filename, O_RDWR);
- if (fd == -1) {
- g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
- return FALSE;
- }
-
- if (fstat (fd, &st) == -1) {
- g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
- close (fd);
- return FALSE;
- }
- /* Mmap file */
- new->size = st.st_size;
- if ((new->map = mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) {
- close (fd);
- g_set_error (err, rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno));
- g_slice_free1 (sizeof (struct rspamd_rrd_file), new);
- return NULL;
- }
-
- close (fd);
-
- /* Adjust pointers */
- rspamd_rrd_adjust_pointers (new, completed);
-
- /* Mark it as finalized */
- new->finalized = completed;
-
- new->filename = g_strdup (filename);
-
- return new;
-}
-
-/**
- * Open (and mmap) existing RRD file
- * @param filename path
- * @param err error pointer
- * @return rrd file structure
- */
-struct rspamd_rrd_file*
-rspamd_rrd_open (const gchar *filename, GError **err)
-{
- return rspamd_rrd_open_common (filename, TRUE, err);
-}
-
-/**
- * Create basic header for rrd file
- * @param filename file path
- * @param ds_count number of data sources
- * @param rra_count number of round robin archives
- * @param pdp_step step of primary data points
- * @param err error pointer
- * @return TRUE if file has been created
- */
-struct rspamd_rrd_file*
-rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gulong pdp_step, GError **err)
-{
- struct rspamd_rrd_file *new;
- struct rrd_file_head head;
- struct rrd_ds_def ds;
- struct rrd_rra_def rra;
- struct rrd_live_head lh;
- struct rrd_pdp_prep pdp;
- struct rrd_cdp_prep cdp;
- struct rrd_rra_ptr rra_ptr;
- gint fd;
- guint i, j;
- struct timeval tv;
-
- /* Open file */
- fd = open (filename, O_RDWR | O_CREAT | O_TRUNC, 0644);
- if (fd == -1) {
- g_set_error (err, rrd_error_quark (), errno, "rrd create error: %s", strerror (errno));
- return NULL;
- }
-
- /* Fill header */
- memset (&head, 0, sizeof (head));
- head.rra_cnt = rra_count;
- head.ds_cnt = ds_count;
- head.pdp_step = pdp_step;
- memcpy (head.cookie, RRD_COOKIE, sizeof (head.cookie));
- memcpy (head.version, RRD_VERSION, sizeof (head.version));
- head.float_cookie = RRD_FLOAT_COOKIE;
-
- if (write (fd, &head, sizeof (head)) != sizeof (head)) {
- close (fd);
- g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
- return NULL;
- }
-
- /* Fill DS section */
- memset (&ds.ds_nam, 0, sizeof (ds.ds_nam));
- memcpy (&ds.dst, "COUNTER", sizeof ("COUNTER"));
- memset (&ds.par, 0, sizeof (ds.par));
- for (i = 0; i < ds_count; i ++) {
- if (write (fd, &ds, sizeof (ds)) != sizeof (ds)) {
- close (fd);
- g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
- return NULL;
- }
- }
-
- /* Fill RRA section */
- memcpy (&rra.cf_nam, "AVERAGE", sizeof ("AVERAGE"));
- rra.pdp_cnt = 1;
- memset (&rra.par, 0, sizeof (rra.par));
- for (i = 0; i < rra_count; i ++) {
- if (write (fd, &rra, sizeof (rra)) != sizeof (rra)) {
- close (fd);
- g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
- return NULL;
- }
- }
-
- /* Fill live header */
- gettimeofday (&tv, NULL);
- lh.last_up = tv.tv_sec;
- lh.last_up_usec = tv.tv_usec;
-
- if (write (fd, &lh, sizeof (lh)) != sizeof (lh)) {
- close (fd);
- g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
- return NULL;
- }
-
- /* Fill pdp prep */
- memcpy (&pdp.last_ds, "U", sizeof ("U"));
- memset (&pdp.scratch, 0, sizeof (pdp.scratch));
- pdp.scratch[PDP_val].dv = 0.;
- pdp.scratch[PDP_unkn_sec_cnt].lv = 0;
- for (i = 0; i < ds_count; i ++) {
- if (write (fd, &pdp, sizeof (pdp)) != sizeof (pdp)) {
- close (fd);
- g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
- return NULL;
- }
- }
-
- /* Fill cdp prep */
- memset (&cdp.scratch, 0, sizeof (cdp.scratch));
- cdp.scratch[CDP_val].dv = NAN;
- for (i = 0; i < rra_count; i ++) {
- cdp.scratch[CDP_unkn_pdp_cnt].lv = 0;
- for (j = 0; j < ds_count; j ++) {
- if (write (fd, &cdp, sizeof (cdp)) != sizeof (cdp)) {
- close (fd);
- g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
- return NULL;
- }
- }
- }
-
- /* Set row pointers */
- memset (&rra_ptr, 0, sizeof (rra_ptr));
- for (i = 0; i < rra_count; i ++) {
- if (write (fd, &rra_ptr, sizeof (rra_ptr)) != sizeof (rra_ptr)) {
- close (fd);
- g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
- return NULL;
- }
- }
-
- close (fd);
- new = rspamd_rrd_open_common (filename, FALSE, err);
-
- return new;
-}
-
-/**
- * Add data sources to rrd file
- * @param filename path to file
- * @param ds array of struct rrd_ds_def
- * @param err error pointer
- * @return TRUE if data sources were added
- */
-gboolean
-rspamd_rrd_add_ds (struct rspamd_rrd_file *file, GArray *ds, GError **err)
-{
-
- if (file == NULL || file->stat_head->ds_cnt * sizeof (struct rrd_ds_def) != ds->len) {
- g_set_error (err, rrd_error_quark (), EINVAL, "rrd add ds failed: wrong arguments");
- return FALSE;
- }
-
- /* Straightforward memcpy */
- memcpy (file->ds_def, ds->data, ds->len);
-
- return TRUE;
-}
-
-/**
- * Add round robin archives to rrd file
- * @param filename path to file
- * @param ds array of struct rrd_rra_def
- * @param err error pointer
- * @return TRUE if archives were added
- */
-gboolean
-rspamd_rrd_add_rra (struct rspamd_rrd_file *file, GArray *rra, GError **err)
-{
- if (file == NULL || file->stat_head->rra_cnt * sizeof (struct rrd_rra_def) != rra->len) {
- g_set_error (err, rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments");
- return FALSE;
- }
-
- /* Straightforward memcpy */
- memcpy (file->rra_def, rra->data, rra->len);
-
- return TRUE;
-}
-
-/**
- * Finalize rrd file header and initialize all RRA in the file
- * @param filename file path
- * @param err error pointer
- * @return TRUE if rrd file is ready for use
- */
-gboolean
-rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err)
-{
- gint fd;
- guint i;
- gint count = 0;
- gdouble vbuf[1024];
- struct stat st;
-
- if (file == NULL || file->filename == NULL) {
- g_set_error (err, rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments");
- return FALSE;
- }
-
- fd = open (file->filename, O_RDWR);
- if (fd == -1) {
- g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
- return FALSE;
- }
-
- if (lseek (fd, 0, SEEK_END) == -1) {
- g_set_error (err, rrd_error_quark (), errno, "rrd seek error: %s", strerror (errno));
- close (fd);
- return FALSE;
- }
-
- /* Adjust CDP */
- for (i = 0; i < file->stat_head->rra_cnt; i ++) {
- file->cdp_prep->scratch[CDP_unkn_pdp_cnt].lv = 0;
- /* Randomize row pointer */
- file->rra_ptr->cur_row = g_random_int () % file->rra_def[i].row_cnt;
- /* Calculate values count */
- count += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
- }
-
- munmap (file->map, file->size);
- /* Write values */
- for (i = 0; i < G_N_ELEMENTS (vbuf); i ++) {
- vbuf[i] = NAN;
- }
-
- while (count > 0) {
- /* Write values in buffered matter */
- if (write (fd, vbuf, MIN ((gint)G_N_ELEMENTS (vbuf), count) * sizeof (gdouble)) == -1) {
- g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
- close (fd);
- return FALSE;
- }
- count -= G_N_ELEMENTS (vbuf);
- }
-
- if (fstat (fd, &st) == -1) {
- g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
- close (fd);
- return FALSE;
- }
-
- /* Mmap again */
- file->size = st.st_size;
- if ((file->map = mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) {
- close (fd);
- g_set_error (err, rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno));
- g_slice_free1 (sizeof (struct rspamd_rrd_file), file);
- return FALSE;
- }
- close (fd);
- /* Adjust pointers */
- rspamd_rrd_adjust_pointers (file, TRUE);
-
- file->finalized = TRUE;
-
- return TRUE;
-}
-
-/**
- * Update pdp_prep data
- * @param file rrd file
- * @param vals new values
- * @param pdp_new new pdp array
- * @param interval time elapsed from the last update
- * @return
- */
-static gboolean
-rspamd_rrd_update_pdp_prep (struct rspamd_rrd_file *file, gdouble *vals, gdouble *pdp_new, gdouble interval)
-{
- guint i;
- enum rrd_dst_type type;
-
- for (i = 0; i < file->stat_head->ds_cnt; i ++) {
- type = rrd_dst_from_string (file->ds_def[i].dst);
-
- if (file->ds_def[i].par[RRD_DS_mrhb_cnt].lv < interval) {
- rspamd_strlcpy (file->pdp_prep[i].last_ds, "U", sizeof (file->pdp_prep[i].last_ds));
- }
-
- if (file->ds_def[i].par[RRD_DS_mrhb_cnt].lv >= interval) {
- switch (type) {
- case RRD_DST_COUNTER:
- case RRD_DST_DERIVE:
- if (file->pdp_prep[i].last_ds[0] == 'U') {
- pdp_new[i] = NAN;
- }
- else {
- pdp_new[i] = vals[i] - strtod (file->pdp_prep[i].last_ds, NULL);
- }
- break;
- case RRD_DST_GAUGE:
- pdp_new[i] = vals[i] * interval;
- break;
- case RRD_DST_ABSOLUTE:
- pdp_new[i] = vals[i];
- break;
- default:
- return FALSE;
- }
- }
- else {
- pdp_new[i] = NAN;
- }
- /* Copy value to the last_ds */
- if (!isnan (vals[i])) {
- rspamd_snprintf (file->pdp_prep[i].last_ds, sizeof (file->pdp_prep[i].last_ds), "%.4f", vals[i]);
- }
- else {
- file->pdp_prep[i].last_ds[0] = 'U';
- file->pdp_prep[i].last_ds[1] = '\0';
- }
- }
-
-
- return TRUE;
-}
-
-/**
- * Update step for this pdp
- * @param file
- * @param pdp_new new pdp array
- * @param pdp_temp temp pdp array
- * @param interval time till last update
- * @param pre_int pre interval
- * @param post_int post intervall
- * @param pdp_diff time till last pdp update
- */
-static void
-rspamd_rrd_update_pdp_step (struct rspamd_rrd_file *file, gdouble *pdp_new, gdouble *pdp_temp, gdouble interval,
- gdouble pre_int, gdouble post_int, gulong pdp_diff)
-{
- guint i;
- rrd_value_t *scratch;
- gulong heartbeat;
-
-
- for (i = 0; i < file->stat_head->ds_cnt; i ++) {
- scratch = file->pdp_prep[i].scratch;
- heartbeat = file->ds_def[i].par[RRD_DS_mrhb_cnt].lv;
- if (!isnan (pdp_new[i])) {
- if (isnan (scratch[PDP_val].dv)) {
- scratch[PDP_val].dv = 0;
- }
- scratch[PDP_val].dv += pdp_new[i] / interval * pre_int;
- pre_int = 0.0;
- }
- /* Check interval value for heartbeat for this DS */
- if ((interval > heartbeat) || (file->stat_head->pdp_step / 2.0 < scratch[PDP_unkn_sec_cnt].lv)) {
- pdp_temp[i] = NAN;
- }
- else {
- pdp_temp[i] = scratch[PDP_val].dv /
- ((double) (pdp_diff - scratch[PDP_unkn_sec_cnt].lv) - pre_int);
- }
-
- if (isnan (pdp_new[i])) {
- scratch[PDP_unkn_sec_cnt].lv = floor (post_int);
- scratch[PDP_val].dv = NAN;
- } else {
- scratch[PDP_unkn_sec_cnt].lv = 0;
- scratch[PDP_val].dv = pdp_new[i] / interval * post_int;
- }
- }
-}
-
-/**
- * Update CDP for this rra
- * @param file rrd file
- * @param pdp_steps how much pdp steps elapsed from the last update
- * @param pdp_offset offset from pdp
- * @param rra_steps how much steps must be updated for this rra
- * @param rra_index index of desired rra
- * @param pdp_temp temporary pdp points
- */
-static void
-rspamd_rrd_update_cdp (struct rspamd_rrd_file *file, gdouble pdp_steps, gdouble pdp_offset, gulong *rra_steps, gulong rra_index,
- gdouble *pdp_temp)
-{
- guint i;
- struct rrd_rra_def *rra;
- rrd_value_t *scratch;
- enum rrd_cf_type cf;
- gdouble last_cdp, cur_cdp;
- gulong pdp_in_cdp;
-
- rra = &file->rra_def[rra_index];
- cf = rrd_cf_from_string (rra->cf_nam);
-
- /* Iterate over all DS for this RRA */
- for (i = 0; i < file->stat_head->ds_cnt; i ++) {
- /* Get CDP for this RRA and DS */
- scratch = file->cdp_prep[rra_index * file->stat_head->ds_cnt + i].scratch;
- if (rra->pdp_cnt > 1) {
- /* Do we have any CDP to update for this rra ? */
- if (rra_steps[rra_index] > 0) {
- if (isnan (pdp_temp[i])) {
- /* New pdp is nan */
- /* Increment unknown points count */
- scratch[CDP_unkn_pdp_cnt].lv += pdp_offset;
- /* Reset secondary value */
- scratch[CDP_secondary_val].dv = NAN;
- }
- else {
- scratch[CDP_secondary_val].dv = pdp_temp[i];
- }
-
- /* Check XFF for this rra */
- if (scratch[CDP_unkn_pdp_cnt].lv > rra->pdp_cnt * rra->par[RRA_cdp_xff_val].lv) {
- /* XFF is reached */
- scratch[CDP_primary_val].dv = NAN;
- }
- else {
- /* Need to initialize CDP using specified consolidation */
- switch (cf) {
- case RRD_CF_AVERAGE:
- last_cdp = isnan (scratch[CDP_val].dv) ? 0.0 : scratch[CDP_val].dv;
- cur_cdp = isnan (pdp_temp[i]) ? 0.0 : pdp_temp[i];
- scratch[CDP_primary_val].dv = (last_cdp + cur_cdp * pdp_offset) / (rra->pdp_cnt - scratch[CDP_unkn_pdp_cnt].lv);
- break;
- case RRD_CF_MAXIMUM:
- last_cdp = isnan (scratch[CDP_val].dv) ? -INFINITY : scratch[CDP_val].dv;
- cur_cdp = isnan (pdp_temp[i]) ? -INFINITY : pdp_temp[i];
- scratch[CDP_primary_val].dv = MAX (last_cdp, cur_cdp);
- break;
- case RRD_CF_MINIMUM:
- last_cdp = isnan (scratch[CDP_val].dv) ? INFINITY : scratch[CDP_val].dv;
- cur_cdp = isnan (pdp_temp[i]) ? INFINITY : pdp_temp[i];
- scratch[CDP_primary_val].dv = MIN (last_cdp, cur_cdp);
- break;
- case RRD_CF_LAST:
- default:
- scratch[CDP_primary_val].dv = pdp_temp[i];
- break;
- }
- }
- /* Init carry of this CDP */
- pdp_in_cdp = (pdp_steps - pdp_offset) / rra->pdp_cnt;
- if (pdp_in_cdp == 0 || isnan (pdp_temp[i])) {
- /* Set overflow */
- switch (cf) {
- case RRD_CF_AVERAGE:
- scratch[CDP_val].dv = 0;
- break;
- case RRD_CF_MAXIMUM:
- scratch[CDP_val].dv = -INFINITY;
- break;
- case RRD_CF_MINIMUM:
- scratch[CDP_val].dv = INFINITY;
- break;
- default:
- scratch[CDP_val].dv = NAN;
- break;
- }
- }
- else {
- /* Special carry for average */
- if (cf == RRD_CF_AVERAGE) {
- scratch[CDP_val].dv = pdp_temp[i] * pdp_in_cdp;
- }
- else {
- scratch[CDP_val].dv = pdp_temp[i];
- }
- }
- }
- /* In this case we just need to update cdp_prep for this RRA */
- else {
- if (isnan (pdp_temp[i])) {
- /* Just increase undefined zone */
- scratch[CDP_unkn_pdp_cnt].lv += pdp_steps;
- }
- else {
- /* Calculate cdp value */
- last_cdp = scratch[CDP_val].dv;
- switch (cf) {
- case RRD_CF_AVERAGE:
- if (isnan (last_cdp)) {
- scratch[CDP_val].dv = pdp_temp[i] * pdp_steps;
- }
- else {
- scratch[CDP_val].dv = last_cdp + pdp_temp[i] * pdp_steps;
- }
- break;
- case RRD_CF_MAXIMUM:
- scratch[CDP_val].dv = MAX (last_cdp, pdp_temp[i]);
- break;
- case RRD_CF_MINIMUM:
- scratch[CDP_val].dv = MIN (last_cdp, pdp_temp[i]);
- break;
- case RRD_CF_LAST:
- scratch[CDP_val].dv = pdp_temp[i];
- break;
- default:
- scratch[CDP_val].dv = NAN;
- break;
- }
- }
- }
- }
- else {
- /* We have nothing to consolidate, but we may miss some pdp */
- if (pdp_steps > 2) {
- /* Just write PDP value */
- scratch[CDP_primary_val].dv = pdp_temp[i];
- scratch[CDP_secondary_val].dv = pdp_temp[i];
- }
- }
- }
-}
-
-/**
- * Update RRA in a file
- * @param file rrd file
- * @param rra_steps steps for each rra
- * @param now current time
- */
-void
-rspamd_rrd_write_rra (struct rspamd_rrd_file *file, gulong *rra_steps)
-{
- guint i, j, scratch_idx, cdp_idx, k;
- struct rrd_rra_def *rra;
- gdouble *rra_row;
-
- /* Iterate over all RRA */
- for (i = 0; i < file->stat_head->rra_cnt; i ++) {
- rra = &file->rra_def[i];
- /* How much steps need to be updated */
- for (j = 0, scratch_idx = CDP_primary_val; j < rra_steps[i]; j ++, scratch_idx = CDP_secondary_val) {
- /* Move row ptr */
- if (++file->rra_ptr[i].cur_row >= rra->row_cnt) {
- file->rra_ptr[i].cur_row = 0;
- }
- /* Calculate seek */
- rra_row = file->rrd_value + (file->stat_head->ds_cnt * i + file->rra_ptr[i].cur_row);
- /* Iterate over DS */
- for (k = 0; k < file->stat_head->ds_cnt; k ++) {
- cdp_idx = i * file->stat_head->ds_cnt + k;
- memcpy (rra_row, &file->cdp_prep[cdp_idx].scratch[scratch_idx].dv, sizeof (gdouble));
- rra_row ++;
- }
- }
- }
-}
-
-/**
- * Add record to rrd file
- * @param file rrd file object
- * @param points points (must be row suitable for this RRA, depending on ds count)
- * @param err error pointer
- * @return TRUE if a row has been added
- */
-gboolean
-rspamd_rrd_add_record (struct rspamd_rrd_file* file, GArray *points, GError **err)
-{
- gdouble interval, *pdp_new, *pdp_temp, pre_int, post_int;
- guint i;
- gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step,
- prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset;
- struct timeval tv;
-
- if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) != points->len) {
- g_set_error (err, rrd_error_quark (), EINVAL, "rrd add points failed: wrong arguments");
- return FALSE;
- }
-
- /* Get interval */
- gettimeofday (&tv, NULL);
- interval = (gdouble)(tv.tv_sec - file->live_head->last_up) +
- (gdouble)(tv.tv_usec - file->live_head->last_up_usec) / 1e6f;
-
- /* Update PDP preparation values */
- pdp_new = g_malloc (sizeof (gdouble) * file->stat_head->ds_cnt);
- pdp_temp = g_malloc (sizeof (gdouble) * file->stat_head->ds_cnt);
- /* How much steps need to be updated in each RRA */
- rra_steps = g_malloc0 (sizeof (gulong) * file->stat_head->rra_cnt);
-
- if (!rspamd_rrd_update_pdp_prep (file, (gdouble *)points->data, pdp_new, interval)) {
- g_set_error (err, rrd_error_quark (), EINVAL, "rrd update pdp failed: wrong arguments");
- g_free (pdp_new);
- g_free (pdp_temp);
- g_free (rra_steps);
- return FALSE;
- }
-
- /* Calculate elapsed steps */
- /* Age in seconds for previous pdp store */
- prev_pdp_age = file->live_head->last_up % file->stat_head->pdp_step;
- /* Time in seconds for last pdp update */
- prev_pdp_step = file->live_head->last_up - prev_pdp_age;
- /* Age in seconds from current time to required pdp time */
- cur_pdp_age = tv.tv_sec % file->stat_head->pdp_step;
- /* Time of desired pdp step */
- cur_pdp_step = tv.tv_sec - cur_pdp_age;
-
- if (cur_pdp_step > prev_pdp_step) {
- pre_int = (gdouble)(cur_pdp_step - file->live_head->last_up) - ((double)file->live_head->last_up_usec) / 1e6f;
- post_int = (gdouble)cur_pdp_age + ((double)tv.tv_usec) / 1e6f;
- }
- else {
- pre_int = interval;
- post_int = 0;
- }
- cur_pdp_count = cur_pdp_step / file->stat_head->pdp_step;
- pdp_steps = (cur_pdp_step - prev_pdp_step) / file->stat_head->pdp_step;
-
-
- if (pdp_steps == 0) {
- /* Simple update of pdp prep */
- for (i = 0; i < file->stat_head->ds_cnt; i ++) {
- if (isnan (pdp_new[i])) {
- /* Increment unknown period */
- file->pdp_prep[i].scratch[PDP_unkn_sec_cnt].lv += floor (interval);
- }
- else {
- if (isnan (file->pdp_prep[i].scratch[PDP_val].dv)) {
- /* Reset pdp to the current value */
- file->pdp_prep[i].scratch[PDP_val].dv = pdp_new[i];
- }
- else {
- /* Increment pdp value */
- file->pdp_prep[i].scratch[PDP_val].dv += pdp_new[i];
- }
- }
- }
- }
- else {
- /* Complex update of PDP, CDP and RRA */
-
- /* Update PDP for this step */
- rspamd_rrd_update_pdp_step (file, pdp_new, pdp_temp, interval, pre_int, post_int, pdp_steps * file->stat_head->pdp_step);
-
-
- /* Update CDP points for each RRA*/
- for (i = 0; i < file->stat_head->rra_cnt; i ++) {
- /* Calculate pdp offset for this RRA */
- pdp_offset = file->rra_def[i].pdp_cnt - cur_pdp_count % file->rra_def[i].pdp_cnt;
- /* How much steps we got for this RRA */
- if (pdp_offset <= pdp_steps) {
- rra_steps[i] = (pdp_steps - pdp_offset) / file->rra_def[i].pdp_cnt + 1;
- }
- else {
- /* This rra have not passed enough pdp steps */
- rra_steps[i] = 0;
- }
- /* Update this specific CDP */
- rspamd_rrd_update_cdp (file, pdp_steps, pdp_offset, rra_steps, i, pdp_temp);
- /* Write RRA */
- rspamd_rrd_write_rra (file, rra_steps);
- }
- }
- file->live_head->last_up = tv.tv_sec;
- file->live_head->last_up_usec = tv.tv_usec;
-
- /* Sync and invalidate */
- msync (file->map, file->size, MS_ASYNC | MS_INVALIDATE);
-
- g_free (pdp_new);
- g_free (pdp_temp);
- g_free (rra_steps);
-
- return TRUE;
-}
-
-/**
- * Close rrd file
- * @param file
- * @return
- */
-gint
-rspamd_rrd_close (struct rspamd_rrd_file* file)
-{
- if (file == NULL) {
- errno = EINVAL;
- return -1;
- }
-
- munmap (file->map, file->size);
- if (file->filename != NULL) {
- g_free (file->filename);
- }
- g_slice_free1 (sizeof (struct rspamd_rrd_file), file);
-
- return 0;
-}