Browse Source

Add CDP updates.

tags/0.5.4
Vsevolod Stakhov 11 years ago
parent
commit
f3b54e28ca
1 changed files with 182 additions and 1 deletions
  1. 182
    1
      src/rrd.c

+ 182
- 1
src/rrd.c View File

@@ -587,6 +587,14 @@ rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err)
return TRUE;
}

/**
* Update pdp_prep data
* @param file rrd file
* @param vals new values
* @param pdp_new new pdp array
* @param interval time elapsed from the last update
* @return
*/
static gboolean
rspamd_rrd_update_pdp_prep (struct rspamd_rrd_file *file, gdouble *vals, gdouble *pdp_new, gdouble interval)
{
@@ -638,6 +646,16 @@ rspamd_rrd_update_pdp_prep (struct rspamd_rrd_file *file, gdouble *vals, gdouble
return TRUE;
}

/**
* Update step for this pdp
* @param file
* @param pdp_new new pdp array
* @param pdp_temp temp pdp array
* @param interval time till last update
* @param pre_int pre interval
* @param post_int post intervall
* @param pdp_diff time till last pdp update
*/
static void
rspamd_rrd_update_pdp_step (struct rspamd_rrd_file *file, gdouble *pdp_new, gdouble *pdp_temp, gdouble interval,
gdouble pre_int, gdouble post_int, gulong pdp_diff)
@@ -675,6 +693,150 @@ rspamd_rrd_update_pdp_step (struct rspamd_rrd_file *file, gdouble *pdp_new, gdou
}
}

/**
* Update CDP for this rra
* @param file rrd file
* @param pdp_steps how much pdp steps elapsed from the last update
* @param pdp_offset offset from pdp
* @param rra_steps how much steps must be updated for this rra
* @param rra_index index of desired rra
* @param pdp_temp temporary pdp points
*/
static void
rspamd_rrd_update_cdp (struct rspamd_rrd_file *file, gdouble pdp_steps, gdouble pdp_offset, gulong *rra_steps, gulong rra_index,
gdouble *pdp_temp)
{
guint i;
struct rrd_rra_def *rra;
rrd_value_t *scratch;
enum rrd_cf_type cf;
gdouble last_cdp, cur_cdp;
gulong pdp_in_cdp;

rra = &file->rra_def[rra_index];
cf = rrd_cf_from_string (rra->cf_nam);

/* Iterate over all DS for this RRA */
for (i = 0; i < file->stat_head->ds_cnt; i ++) {
/* Get CDP for this RRA and DS */
scratch = file->cdp_prep[rra_index * file->stat_head->ds_cnt + i].scratch;
if (rra->pdp_cnt > 1) {
/* Do we have any CDP to update for this rra ? */
if (rra_steps[rra_index] > 0) {
if (isnan (pdp_temp[i])) {
/* New pdp is nan */
/* Increment unknown points count */
scratch[CDP_unkn_pdp_cnt].lv += pdp_offset;
/* Reset secondary value */
scratch[CDP_secondary_val].dv = NAN;
}
else {
scratch[CDP_secondary_val].dv = pdp_temp[i];
}

/* Check XFF for this rra */
if (scratch[CDP_unkn_pdp_cnt].lv > rra->pdp_cnt * rra->par[RRA_cdp_xff_val].lv) {
/* XFF is reached */
scratch[CDP_primary_val].dv = NAN;
}
else {
/* Need to initialize CDP using specified consolidation */
switch (cf) {
case RRD_CF_AVERAGE:
last_cdp = isnan (scratch[CDP_val].dv) ? 0.0 : scratch[CDP_val].dv;
cur_cdp = isnan (pdp_temp[i]) ? 0.0 : pdp_temp[i];
scratch[CDP_primary_val].dv = (last_cdp + cur_cdp * pdp_offset) / (rra->pdp_cnt - scratch[CDP_unkn_pdp_cnt].lv);
break;
case RRD_CF_MAXIMUM:
last_cdp = isnan (scratch[CDP_val].dv) ? -INFINITY : scratch[CDP_val].dv;
cur_cdp = isnan (pdp_temp[i]) ? -INFINITY : pdp_temp[i];
scratch[CDP_primary_val].dv = MAX (last_cdp, cur_cdp);
break;
case RRD_CF_MINIMUM:
last_cdp = isnan (scratch[CDP_val].dv) ? INFINITY : scratch[CDP_val].dv;
cur_cdp = isnan (pdp_temp[i]) ? INFINITY : pdp_temp[i];
scratch[CDP_primary_val].dv = MIN (last_cdp, cur_cdp);
break;
case RRD_CF_LAST:
default:
scratch[CDP_primary_val].dv = pdp_temp[i];
break;
}
}
/* Init carry of this CDP */
pdp_in_cdp = (pdp_steps - pdp_offset) / rra->pdp_cnt;
if (pdp_in_cdp == 0 || isnan (pdp_temp[i])) {
/* Set overflow */
switch (cf) {
case RRD_CF_AVERAGE:
scratch[CDP_val].dv = 0;
break;
case RRD_CF_MAXIMUM:
scratch[CDP_val].dv = -INFINITY;
break;
case RRD_CF_MINIMUM:
scratch[CDP_val].dv = INFINITY;
break;
default:
scratch[CDP_val].dv = NAN;
break;
}
}
else {
/* Special carry for average */
if (cf == RRD_CF_AVERAGE) {
scratch[CDP_val].dv = pdp_temp[i] * pdp_in_cdp;
}
else {
scratch[CDP_val].dv = pdp_temp[i];
}
}
}
/* In this case we just need to update cdp_prep for this RRA */
else {
if (isnan (pdp_temp[i])) {
/* Just increase undefined zone */
scratch[CDP_unkn_pdp_cnt].lv += pdp_steps;
}
else {
/* Calculate cdp value */
last_cdp = scratch[CDP_val].dv;
switch (cf) {
case RRD_CF_AVERAGE:
if (isnan (last_cdp)) {
scratch[CDP_val].dv = pdp_temp[i] * pdp_steps;
}
else {
scratch[CDP_val].dv = last_cdp + pdp_temp[i] * pdp_steps;
}
break;
case RRD_CF_MAXIMUM:
scratch[CDP_val].dv = MAX (last_cdp, pdp_temp[i]);
break;
case RRD_CF_MINIMUM:
scratch[CDP_val].dv = MIN (last_cdp, pdp_temp[i]);
break;
case RRD_CF_LAST:
scratch[CDP_val].dv = pdp_temp[i];
break;
default:
scratch[CDP_val].dv = NAN;
break;
}
}
}
}
else {
/* We have nothing to consolidate, but we may miss some pdp */
if (pdp_steps > 2) {
/* Just write PDP value */
scratch[CDP_primary_val].dv = pdp_temp[i];
scratch[CDP_secondary_val].dv = pdp_temp[i];
}
}
}
}

/**
* Add record to rrd file
* @param file rrd file object
@@ -689,7 +851,7 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
gdouble *row, interval, *pdp_new, *pdp_temp, pre_int, post_int;
guint i;
gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step,
prev_pdp_age, cur_pdp_age;
prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset;
struct timeval tv;

if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) != points->len || rra_idx >= file->stat_head->rra_cnt) {
@@ -706,11 +868,14 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
/* Update PDP preparation values */
pdp_new = g_malloc (sizeof (gdouble) * file->stat_head->ds_cnt);
pdp_temp = g_malloc (sizeof (gdouble) * file->stat_head->ds_cnt);
/* How much steps need to be updated in each RRA */
rra_steps = g_malloc (sizeof (gulong) * file->stat_head->rra_cnt);

if (!rspamd_rrd_update_pdp_prep (file, (gdouble *)points->data, pdp_new, interval)) {
g_set_error (err, rrd_error_quark (), EINVAL, "rrd update pdp failed: wrong arguments");
g_free (pdp_new);
g_free (pdp_temp);
g_free (rra_steps);
return FALSE;
}

@@ -758,6 +923,21 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
/* Update PDP for this step */
rspamd_rrd_update_pdp_step (file, pdp_new, pdp_temp, interval, pre_int, post_int, pdp_steps * file->stat_head->pdp_step);

/* Update CDP points for each RRA*/
for (i = 0; i < file->stat_head->rra_cnt; i ++) {
/* Calculate pdp offset for this RRA */
pdp_offset = file->rra_def[i].pdp_cnt - cur_pdp_count % file->rra_def[i].pdp_cnt;
/* How much steps we got for this RRA */
if (pdp_offset <= pdp_steps) {
rra_steps[i] = (pdp_steps - pdp_offset) / file->rra_def[i].pdp_cnt + 1;
}
else {
/* This rra have not passed enough pdp steps */
rra_steps[i] = 0;
}
/* Update this specific CDP */
rspamd_rrd_update_cdp (file, pdp_steps, pdp_offset, rra_steps, i, pdp_temp);
}
}

/* Skip unaffected rra */
@@ -778,6 +958,7 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin

g_free (pdp_new);
g_free (pdp_temp);
g_free (rra_steps);

return TRUE;
}

Loading…
Cancel
Save