From f3b54e28cab8b54ba09536f52268a37713334598 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Mon, 17 Dec 2012 22:56:03 +0400 Subject: [PATCH] Add CDP updates. --- src/rrd.c | 183 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 182 insertions(+), 1 deletion(-) diff --git a/src/rrd.c b/src/rrd.c index c82332fda..34d956a16 100644 --- a/src/rrd.c +++ b/src/rrd.c @@ -587,6 +587,14 @@ rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err) return TRUE; } +/** + * Update pdp_prep data + * @param file rrd file + * @param vals new values + * @param pdp_new new pdp array + * @param interval time elapsed from the last update + * @return + */ static gboolean rspamd_rrd_update_pdp_prep (struct rspamd_rrd_file *file, gdouble *vals, gdouble *pdp_new, gdouble interval) { @@ -638,6 +646,16 @@ rspamd_rrd_update_pdp_prep (struct rspamd_rrd_file *file, gdouble *vals, gdouble return TRUE; } +/** + * Update step for this pdp + * @param file + * @param pdp_new new pdp array + * @param pdp_temp temp pdp array + * @param interval time till last update + * @param pre_int pre interval + * @param post_int post intervall + * @param pdp_diff time till last pdp update + */ static void rspamd_rrd_update_pdp_step (struct rspamd_rrd_file *file, gdouble *pdp_new, gdouble *pdp_temp, gdouble interval, gdouble pre_int, gdouble post_int, gulong pdp_diff) @@ -675,6 +693,150 @@ rspamd_rrd_update_pdp_step (struct rspamd_rrd_file *file, gdouble *pdp_new, gdou } } +/** + * Update CDP for this rra + * @param file rrd file + * @param pdp_steps how much pdp steps elapsed from the last update + * @param pdp_offset offset from pdp + * @param rra_steps how much steps must be updated for this rra + * @param rra_index index of desired rra + * @param pdp_temp temporary pdp points + */ +static void +rspamd_rrd_update_cdp (struct rspamd_rrd_file *file, gdouble pdp_steps, gdouble pdp_offset, gulong *rra_steps, gulong rra_index, + gdouble *pdp_temp) +{ + guint i; + struct rrd_rra_def *rra; + rrd_value_t *scratch; + enum rrd_cf_type cf; + gdouble last_cdp, cur_cdp; + gulong pdp_in_cdp; + + rra = &file->rra_def[rra_index]; + cf = rrd_cf_from_string (rra->cf_nam); + + /* Iterate over all DS for this RRA */ + for (i = 0; i < file->stat_head->ds_cnt; i ++) { + /* Get CDP for this RRA and DS */ + scratch = file->cdp_prep[rra_index * file->stat_head->ds_cnt + i].scratch; + if (rra->pdp_cnt > 1) { + /* Do we have any CDP to update for this rra ? */ + if (rra_steps[rra_index] > 0) { + if (isnan (pdp_temp[i])) { + /* New pdp is nan */ + /* Increment unknown points count */ + scratch[CDP_unkn_pdp_cnt].lv += pdp_offset; + /* Reset secondary value */ + scratch[CDP_secondary_val].dv = NAN; + } + else { + scratch[CDP_secondary_val].dv = pdp_temp[i]; + } + + /* Check XFF for this rra */ + if (scratch[CDP_unkn_pdp_cnt].lv > rra->pdp_cnt * rra->par[RRA_cdp_xff_val].lv) { + /* XFF is reached */ + scratch[CDP_primary_val].dv = NAN; + } + else { + /* Need to initialize CDP using specified consolidation */ + switch (cf) { + case RRD_CF_AVERAGE: + last_cdp = isnan (scratch[CDP_val].dv) ? 0.0 : scratch[CDP_val].dv; + cur_cdp = isnan (pdp_temp[i]) ? 0.0 : pdp_temp[i]; + scratch[CDP_primary_val].dv = (last_cdp + cur_cdp * pdp_offset) / (rra->pdp_cnt - scratch[CDP_unkn_pdp_cnt].lv); + break; + case RRD_CF_MAXIMUM: + last_cdp = isnan (scratch[CDP_val].dv) ? -INFINITY : scratch[CDP_val].dv; + cur_cdp = isnan (pdp_temp[i]) ? -INFINITY : pdp_temp[i]; + scratch[CDP_primary_val].dv = MAX (last_cdp, cur_cdp); + break; + case RRD_CF_MINIMUM: + last_cdp = isnan (scratch[CDP_val].dv) ? INFINITY : scratch[CDP_val].dv; + cur_cdp = isnan (pdp_temp[i]) ? INFINITY : pdp_temp[i]; + scratch[CDP_primary_val].dv = MIN (last_cdp, cur_cdp); + break; + case RRD_CF_LAST: + default: + scratch[CDP_primary_val].dv = pdp_temp[i]; + break; + } + } + /* Init carry of this CDP */ + pdp_in_cdp = (pdp_steps - pdp_offset) / rra->pdp_cnt; + if (pdp_in_cdp == 0 || isnan (pdp_temp[i])) { + /* Set overflow */ + switch (cf) { + case RRD_CF_AVERAGE: + scratch[CDP_val].dv = 0; + break; + case RRD_CF_MAXIMUM: + scratch[CDP_val].dv = -INFINITY; + break; + case RRD_CF_MINIMUM: + scratch[CDP_val].dv = INFINITY; + break; + default: + scratch[CDP_val].dv = NAN; + break; + } + } + else { + /* Special carry for average */ + if (cf == RRD_CF_AVERAGE) { + scratch[CDP_val].dv = pdp_temp[i] * pdp_in_cdp; + } + else { + scratch[CDP_val].dv = pdp_temp[i]; + } + } + } + /* In this case we just need to update cdp_prep for this RRA */ + else { + if (isnan (pdp_temp[i])) { + /* Just increase undefined zone */ + scratch[CDP_unkn_pdp_cnt].lv += pdp_steps; + } + else { + /* Calculate cdp value */ + last_cdp = scratch[CDP_val].dv; + switch (cf) { + case RRD_CF_AVERAGE: + if (isnan (last_cdp)) { + scratch[CDP_val].dv = pdp_temp[i] * pdp_steps; + } + else { + scratch[CDP_val].dv = last_cdp + pdp_temp[i] * pdp_steps; + } + break; + case RRD_CF_MAXIMUM: + scratch[CDP_val].dv = MAX (last_cdp, pdp_temp[i]); + break; + case RRD_CF_MINIMUM: + scratch[CDP_val].dv = MIN (last_cdp, pdp_temp[i]); + break; + case RRD_CF_LAST: + scratch[CDP_val].dv = pdp_temp[i]; + break; + default: + scratch[CDP_val].dv = NAN; + break; + } + } + } + } + else { + /* We have nothing to consolidate, but we may miss some pdp */ + if (pdp_steps > 2) { + /* Just write PDP value */ + scratch[CDP_primary_val].dv = pdp_temp[i]; + scratch[CDP_secondary_val].dv = pdp_temp[i]; + } + } + } +} + /** * Add record to rrd file * @param file rrd file object @@ -689,7 +851,7 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin gdouble *row, interval, *pdp_new, *pdp_temp, pre_int, post_int; guint i; gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step, - prev_pdp_age, cur_pdp_age; + prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset; struct timeval tv; if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) != points->len || rra_idx >= file->stat_head->rra_cnt) { @@ -706,11 +868,14 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin /* Update PDP preparation values */ pdp_new = g_malloc (sizeof (gdouble) * file->stat_head->ds_cnt); pdp_temp = g_malloc (sizeof (gdouble) * file->stat_head->ds_cnt); + /* How much steps need to be updated in each RRA */ + rra_steps = g_malloc (sizeof (gulong) * file->stat_head->rra_cnt); if (!rspamd_rrd_update_pdp_prep (file, (gdouble *)points->data, pdp_new, interval)) { g_set_error (err, rrd_error_quark (), EINVAL, "rrd update pdp failed: wrong arguments"); g_free (pdp_new); g_free (pdp_temp); + g_free (rra_steps); return FALSE; } @@ -758,6 +923,21 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin /* Update PDP for this step */ rspamd_rrd_update_pdp_step (file, pdp_new, pdp_temp, interval, pre_int, post_int, pdp_steps * file->stat_head->pdp_step); + /* Update CDP points for each RRA*/ + for (i = 0; i < file->stat_head->rra_cnt; i ++) { + /* Calculate pdp offset for this RRA */ + pdp_offset = file->rra_def[i].pdp_cnt - cur_pdp_count % file->rra_def[i].pdp_cnt; + /* How much steps we got for this RRA */ + if (pdp_offset <= pdp_steps) { + rra_steps[i] = (pdp_steps - pdp_offset) / file->rra_def[i].pdp_cnt + 1; + } + else { + /* This rra have not passed enough pdp steps */ + rra_steps[i] = 0; + } + /* Update this specific CDP */ + rspamd_rrd_update_cdp (file, pdp_steps, pdp_offset, rra_steps, i, pdp_temp); + } } /* Skip unaffected rra */ @@ -778,6 +958,7 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin g_free (pdp_new); g_free (pdp_temp); + g_free (rra_steps); return TRUE; } -- 2.39.5