From: Vsevolod Stakhov Date: Tue, 18 Dec 2012 18:12:45 +0000 (+0400) Subject: Implement rra writing. X-Git-Tag: 0.5.4~39 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=ca860398fe9d8dfc43c518cea9f41a52d87aa255;p=rspamd.git Implement rra writing. --- diff --git a/src/rrd.c b/src/rrd.c index 34d956a16..a971e5003 100644 --- a/src/rrd.c +++ b/src/rrd.c @@ -139,7 +139,7 @@ void rrd_make_default_ds (const gchar *name, gulong pdp_step, struct rrd_ds_def *ds) { rspamd_strlcpy (ds->ds_nam, name, sizeof (ds->ds_nam)); - rspamd_strlcpy (ds->dst, "AVERAGE", sizeof (ds->dst)); + rspamd_strlcpy (ds->dst, "COUNTER", sizeof (ds->dst)); memset (ds->par, 0, sizeof (ds->par)); ds->par[RRD_DS_mrhb_cnt].lv = pdp_step * 2; ds->par[RRD_DS_min_val].dv = NAN; @@ -837,29 +837,61 @@ rspamd_rrd_update_cdp (struct rspamd_rrd_file *file, gdouble pdp_steps, gdouble } } +/** + * Update RRA in a file + * @param file rrd file + * @param rra_steps steps for each rra + * @param now current time + */ +void +rspamd_rrd_write_rra (struct rspamd_rrd_file *file, gulong *rra_steps) +{ + guint i, j, scratch_idx, cdp_idx, k; + struct rrd_rra_def *rra; + gdouble *rra_row; + + /* Iterate over all RRA */ + for (i = 0; i < file->stat_head->rra_cnt; i ++) { + rra = &file->rra_def[i]; + /* How much steps need to be updated */ + for (j = 0, scratch_idx = CDP_primary_val; j < rra_steps[i]; j ++, scratch_idx = CDP_secondary_val) { + /* Move row ptr */ + if (++file->rra_ptr[i].cur_row >= rra->row_cnt) { + file->rra_ptr[i].cur_row = 0; + } + /* Calculate seek */ + rra_row = file->rrd_value + (file->stat_head->ds_cnt * i + file->rra_ptr[i].cur_row); + /* Iterate over DS */ + for (k = 0; k < file->stat_head->ds_cnt; k ++) { + cdp_idx = i * file->stat_head->ds_cnt + k; + memcpy (rra_row, &file->cdp_prep[cdp_idx].scratch[scratch_idx].dv, sizeof (gdouble)); + rra_row ++; + } + } + } +} + /** * Add record to rrd file * @param file rrd file object - * @param rra_idx index of rra being added * @param points points (must be row suitable for this RRA, depending on ds count) * @param err error pointer * @return TRUE if a row has been added */ gboolean -rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err) +rspamd_rrd_add_record (struct rspamd_rrd_file* file, GArray *points, GError **err) { - gdouble *row, interval, *pdp_new, *pdp_temp, pre_int, post_int; + gdouble interval, *pdp_new, *pdp_temp, pre_int, post_int; guint i; gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step, prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset; struct timeval tv; - if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) != points->len || rra_idx >= file->stat_head->rra_cnt) { + if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) != points->len) { g_set_error (err, rrd_error_quark (), EINVAL, "rrd add points failed: wrong arguments"); return FALSE; } - row = file->rrd_value; /* Get interval */ gettimeofday (&tv, NULL); interval = (gdouble)(tv.tv_sec - file->live_head->last_up) + @@ -937,24 +969,15 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin } /* Update this specific CDP */ rspamd_rrd_update_cdp (file, pdp_steps, pdp_offset, rra_steps, i, pdp_temp); + /* Write RRA */ + rspamd_rrd_write_rra (file, rra_steps); } } + file->live_head->last_up = tv.tv_sec; + file->live_head->last_up_usec = tv.tv_usec; - /* Skip unaffected rra */ - for (i = 0; i < rra_idx; i ++) { - row += file->rra_def[i].row_cnt * file->stat_head->ds_cnt; - } - - row += file->rra_ptr[rra_idx].cur_row * file->stat_head->ds_cnt; - - /* Increase row index */ - file->rra_ptr[rra_idx].cur_row ++; - if (file->rra_ptr[rra_idx].cur_row >= file->rra_def[rra_idx].row_cnt) { - file->rra_ptr[rra_idx].cur_row = 0; - } - - /* Write data */ - memcpy (row, points->data, points->len); + /* Sync and invalidate */ + msync (file->map, file->size, MS_ASYNC | MS_INVALIDATE); g_free (pdp_new); g_free (pdp_temp); diff --git a/src/rrd.h b/src/rrd.h index 158965caa..ff6902894 100644 --- a/src/rrd.h +++ b/src/rrd.h @@ -32,7 +32,7 @@ */ #define RRD_COOKIE "RRD" -#define RRD_VERSION "0004" +#define RRD_VERSION "0003" #define RRD_FLOAT_COOKIE ((double)8.642135E130) typedef union { @@ -326,12 +326,11 @@ gboolean rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err); /** * Add record to rrd file * @param file rrd file object - * @param rra_idx index of rra being added * @param points points (must be row suitable for this RRA, depending on ds count) * @param err error pointer * @return TRUE if a row has been added */ -gboolean rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err); +gboolean rspamd_rrd_add_record (struct rspamd_rrd_file* file, GArray *points, GError **err); /** * Close rrd file diff --git a/test/rspamd_rrd_test.c b/test/rspamd_rrd_test.c index 04d523e6c..4d2347b1f 100644 --- a/test/rspamd_rrd_test.c +++ b/test/rspamd_rrd_test.c @@ -33,6 +33,7 @@ rspamd_rrd_test_func () struct rrd_rra_def rra; struct rrd_ds_def ds; GArray ar; + GError *err = NULL; struct rspamd_rrd_file *rrd; gint fd, i; gdouble t; @@ -40,36 +41,34 @@ rspamd_rrd_test_func () rspamd_snprintf (tmpfile, sizeof (tmpfile), "/tmp/rspamd_rrd.rrd"); /* Create sample rrd */ - g_assert ((rrd = rspamd_rrd_create (tmpfile, 1, 1, 5, NULL)) != NULL); + g_assert ((rrd = rspamd_rrd_create (tmpfile, 1, 1, 5, &err)) != NULL); /* Add RRA */ - rspamd_strlcpy (rra.cf_nam, rrd_cf_to_string (RRD_CF_AVERAGE), sizeof (rra.cf_nam)); - rra.pdp_cnt = 1; - rra.row_cnt = 100; + rrd_make_default_rra ("AVERAGE", 2, 100, &rra); ar.data = &rra; ar.len = sizeof (rra); - g_assert (rspamd_rrd_add_rra (rrd, &ar, NULL)); + g_assert (rspamd_rrd_add_rra (rrd, &ar, &err)); /* Add DS */ - rspamd_strlcpy (ds.dst, rrd_dst_to_string (RRD_DST_ABSOLUTE), sizeof (ds.dst)); - rspamd_strlcpy (ds.ds_nam, "test", sizeof (ds.ds_nam)); + rrd_make_default_ds ("test", 1, &ds); ar.data = &ds; ar.len = sizeof (ds); - g_assert (rspamd_rrd_add_ds (rrd, &ar, NULL)); + g_assert (rspamd_rrd_add_ds (rrd, &ar, &err)); /* Finalize */ - g_assert (rspamd_rrd_finalize (rrd, NULL)); + g_assert (rspamd_rrd_finalize (rrd, &err)); /* Close */ rspamd_rrd_close (rrd); /* Reopen */ - g_assert ((rrd = rspamd_rrd_open (tmpfile, NULL)) != NULL); - + g_assert ((rrd = rspamd_rrd_open (tmpfile, &err)) != NULL); +#if 0 /* Add some points */ - for (i = 0; i < 200; i ++) { + for (i = 0; i < 10; i ++) { t = i; ar.data = &t; ar.len = sizeof (gdouble); - g_assert (rspamd_rrd_add_record (rrd, 0, &ar, NULL)); + g_assert (rspamd_rrd_add_record (rrd, &ar, &err)); + sleep (1); } - +#endif /* Finish */ rspamd_rrd_close (rrd); /* unlink (tmpfile); */