aboutsummaryrefslogtreecommitdiffstats
path: root/src/rrd.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-12-17 22:56:03 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-12-17 22:56:03 +0400
commitf3b54e28cab8b54ba09536f52268a37713334598 (patch)
treed02582475bdf9800f9f4389f0dc425eedcc91ba9 /src/rrd.c
parentf8707d2206f898c2cd7cad505aa7423b6421d362 (diff)
downloadrspamd-f3b54e28cab8b54ba09536f52268a37713334598.tar.gz
rspamd-f3b54e28cab8b54ba09536f52268a37713334598.zip
Add CDP updates.
Diffstat (limited to 'src/rrd.c')
-rw-r--r--src/rrd.c183
1 files changed, 182 insertions, 1 deletions
diff --git a/src/rrd.c b/src/rrd.c
index c82332fda..34d956a16 100644
--- a/src/rrd.c
+++ b/src/rrd.c
@@ -587,6 +587,14 @@ rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err)
return TRUE;
}
+/**
+ * Update pdp_prep data
+ * @param file rrd file
+ * @param vals new values
+ * @param pdp_new new pdp array
+ * @param interval time elapsed from the last update
+ * @return
+ */
static gboolean
rspamd_rrd_update_pdp_prep (struct rspamd_rrd_file *file, gdouble *vals, gdouble *pdp_new, gdouble interval)
{
@@ -638,6 +646,16 @@ rspamd_rrd_update_pdp_prep (struct rspamd_rrd_file *file, gdouble *vals, gdouble
return TRUE;
}
+/**
+ * Update step for this pdp
+ * @param file
+ * @param pdp_new new pdp array
+ * @param pdp_temp temp pdp array
+ * @param interval time till last update
+ * @param pre_int pre interval
+ * @param post_int post intervall
+ * @param pdp_diff time till last pdp update
+ */
static void
rspamd_rrd_update_pdp_step (struct rspamd_rrd_file *file, gdouble *pdp_new, gdouble *pdp_temp, gdouble interval,
gdouble pre_int, gdouble post_int, gulong pdp_diff)
@@ -676,6 +694,150 @@ rspamd_rrd_update_pdp_step (struct rspamd_rrd_file *file, gdouble *pdp_new, gdou
}
/**
+ * Update CDP for this rra
+ * @param file rrd file
+ * @param pdp_steps how much pdp steps elapsed from the last update
+ * @param pdp_offset offset from pdp
+ * @param rra_steps how much steps must be updated for this rra
+ * @param rra_index index of desired rra
+ * @param pdp_temp temporary pdp points
+ */
+static void
+rspamd_rrd_update_cdp (struct rspamd_rrd_file *file, gdouble pdp_steps, gdouble pdp_offset, gulong *rra_steps, gulong rra_index,
+ gdouble *pdp_temp)
+{
+ guint i;
+ struct rrd_rra_def *rra;
+ rrd_value_t *scratch;
+ enum rrd_cf_type cf;
+ gdouble last_cdp, cur_cdp;
+ gulong pdp_in_cdp;
+
+ rra = &file->rra_def[rra_index];
+ cf = rrd_cf_from_string (rra->cf_nam);
+
+ /* Iterate over all DS for this RRA */
+ for (i = 0; i < file->stat_head->ds_cnt; i ++) {
+ /* Get CDP for this RRA and DS */
+ scratch = file->cdp_prep[rra_index * file->stat_head->ds_cnt + i].scratch;
+ if (rra->pdp_cnt > 1) {
+ /* Do we have any CDP to update for this rra ? */
+ if (rra_steps[rra_index] > 0) {
+ if (isnan (pdp_temp[i])) {
+ /* New pdp is nan */
+ /* Increment unknown points count */
+ scratch[CDP_unkn_pdp_cnt].lv += pdp_offset;
+ /* Reset secondary value */
+ scratch[CDP_secondary_val].dv = NAN;
+ }
+ else {
+ scratch[CDP_secondary_val].dv = pdp_temp[i];
+ }
+
+ /* Check XFF for this rra */
+ if (scratch[CDP_unkn_pdp_cnt].lv > rra->pdp_cnt * rra->par[RRA_cdp_xff_val].lv) {
+ /* XFF is reached */
+ scratch[CDP_primary_val].dv = NAN;
+ }
+ else {
+ /* Need to initialize CDP using specified consolidation */
+ switch (cf) {
+ case RRD_CF_AVERAGE:
+ last_cdp = isnan (scratch[CDP_val].dv) ? 0.0 : scratch[CDP_val].dv;
+ cur_cdp = isnan (pdp_temp[i]) ? 0.0 : pdp_temp[i];
+ scratch[CDP_primary_val].dv = (last_cdp + cur_cdp * pdp_offset) / (rra->pdp_cnt - scratch[CDP_unkn_pdp_cnt].lv);
+ break;
+ case RRD_CF_MAXIMUM:
+ last_cdp = isnan (scratch[CDP_val].dv) ? -INFINITY : scratch[CDP_val].dv;
+ cur_cdp = isnan (pdp_temp[i]) ? -INFINITY : pdp_temp[i];
+ scratch[CDP_primary_val].dv = MAX (last_cdp, cur_cdp);
+ break;
+ case RRD_CF_MINIMUM:
+ last_cdp = isnan (scratch[CDP_val].dv) ? INFINITY : scratch[CDP_val].dv;
+ cur_cdp = isnan (pdp_temp[i]) ? INFINITY : pdp_temp[i];
+ scratch[CDP_primary_val].dv = MIN (last_cdp, cur_cdp);
+ break;
+ case RRD_CF_LAST:
+ default:
+ scratch[CDP_primary_val].dv = pdp_temp[i];
+ break;
+ }
+ }
+ /* Init carry of this CDP */
+ pdp_in_cdp = (pdp_steps - pdp_offset) / rra->pdp_cnt;
+ if (pdp_in_cdp == 0 || isnan (pdp_temp[i])) {
+ /* Set overflow */
+ switch (cf) {
+ case RRD_CF_AVERAGE:
+ scratch[CDP_val].dv = 0;
+ break;
+ case RRD_CF_MAXIMUM:
+ scratch[CDP_val].dv = -INFINITY;
+ break;
+ case RRD_CF_MINIMUM:
+ scratch[CDP_val].dv = INFINITY;
+ break;
+ default:
+ scratch[CDP_val].dv = NAN;
+ break;
+ }
+ }
+ else {
+ /* Special carry for average */
+ if (cf == RRD_CF_AVERAGE) {
+ scratch[CDP_val].dv = pdp_temp[i] * pdp_in_cdp;
+ }
+ else {
+ scratch[CDP_val].dv = pdp_temp[i];
+ }
+ }
+ }
+ /* In this case we just need to update cdp_prep for this RRA */
+ else {
+ if (isnan (pdp_temp[i])) {
+ /* Just increase undefined zone */
+ scratch[CDP_unkn_pdp_cnt].lv += pdp_steps;
+ }
+ else {
+ /* Calculate cdp value */
+ last_cdp = scratch[CDP_val].dv;
+ switch (cf) {
+ case RRD_CF_AVERAGE:
+ if (isnan (last_cdp)) {
+ scratch[CDP_val].dv = pdp_temp[i] * pdp_steps;
+ }
+ else {
+ scratch[CDP_val].dv = last_cdp + pdp_temp[i] * pdp_steps;
+ }
+ break;
+ case RRD_CF_MAXIMUM:
+ scratch[CDP_val].dv = MAX (last_cdp, pdp_temp[i]);
+ break;
+ case RRD_CF_MINIMUM:
+ scratch[CDP_val].dv = MIN (last_cdp, pdp_temp[i]);
+ break;
+ case RRD_CF_LAST:
+ scratch[CDP_val].dv = pdp_temp[i];
+ break;
+ default:
+ scratch[CDP_val].dv = NAN;
+ break;
+ }
+ }
+ }
+ }
+ else {
+ /* We have nothing to consolidate, but we may miss some pdp */
+ if (pdp_steps > 2) {
+ /* Just write PDP value */
+ scratch[CDP_primary_val].dv = pdp_temp[i];
+ scratch[CDP_secondary_val].dv = pdp_temp[i];
+ }
+ }
+ }
+}
+
+/**
* Add record to rrd file
* @param file rrd file object
* @param rra_idx index of rra being added
@@ -689,7 +851,7 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
gdouble *row, interval, *pdp_new, *pdp_temp, pre_int, post_int;
guint i;
gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step,
- prev_pdp_age, cur_pdp_age;
+ prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset;
struct timeval tv;
if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) != points->len || rra_idx >= file->stat_head->rra_cnt) {
@@ -706,11 +868,14 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
/* Update PDP preparation values */
pdp_new = g_malloc (sizeof (gdouble) * file->stat_head->ds_cnt);
pdp_temp = g_malloc (sizeof (gdouble) * file->stat_head->ds_cnt);
+ /* How much steps need to be updated in each RRA */
+ rra_steps = g_malloc (sizeof (gulong) * file->stat_head->rra_cnt);
if (!rspamd_rrd_update_pdp_prep (file, (gdouble *)points->data, pdp_new, interval)) {
g_set_error (err, rrd_error_quark (), EINVAL, "rrd update pdp failed: wrong arguments");
g_free (pdp_new);
g_free (pdp_temp);
+ g_free (rra_steps);
return FALSE;
}
@@ -758,6 +923,21 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
/* Update PDP for this step */
rspamd_rrd_update_pdp_step (file, pdp_new, pdp_temp, interval, pre_int, post_int, pdp_steps * file->stat_head->pdp_step);
+ /* Update CDP points for each RRA*/
+ for (i = 0; i < file->stat_head->rra_cnt; i ++) {
+ /* Calculate pdp offset for this RRA */
+ pdp_offset = file->rra_def[i].pdp_cnt - cur_pdp_count % file->rra_def[i].pdp_cnt;
+ /* How much steps we got for this RRA */
+ if (pdp_offset <= pdp_steps) {
+ rra_steps[i] = (pdp_steps - pdp_offset) / file->rra_def[i].pdp_cnt + 1;
+ }
+ else {
+ /* This rra have not passed enough pdp steps */
+ rra_steps[i] = 0;
+ }
+ /* Update this specific CDP */
+ rspamd_rrd_update_cdp (file, pdp_steps, pdp_offset, rra_steps, i, pdp_temp);
+ }
}
/* Skip unaffected rra */
@@ -778,6 +958,7 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
g_free (pdp_new);
g_free (pdp_temp);
+ g_free (rra_steps);
return TRUE;
}