aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-12-18 22:12:45 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-12-18 22:12:45 +0400
commitca860398fe9d8dfc43c518cea9f41a52d87aa255 (patch)
treed2de54802f483c5e98fcfc8b889e7285bfb7f3bd
parentf3b54e28cab8b54ba09536f52268a37713334598 (diff)
downloadrspamd-ca860398fe9d8dfc43c518cea9f41a52d87aa255.tar.gz
rspamd-ca860398fe9d8dfc43c518cea9f41a52d87aa255.zip
Implement rra writing.
-rw-r--r--src/rrd.c65
-rw-r--r--src/rrd.h5
-rw-r--r--test/rspamd_rrd_test.c27
3 files changed, 59 insertions, 38 deletions
diff --git a/src/rrd.c b/src/rrd.c
index 34d956a16..a971e5003 100644
--- a/src/rrd.c
+++ b/src/rrd.c
@@ -139,7 +139,7 @@ void
rrd_make_default_ds (const gchar *name, gulong pdp_step, struct rrd_ds_def *ds)
{
rspamd_strlcpy (ds->ds_nam, name, sizeof (ds->ds_nam));
- rspamd_strlcpy (ds->dst, "AVERAGE", sizeof (ds->dst));
+ rspamd_strlcpy (ds->dst, "COUNTER", sizeof (ds->dst));
memset (ds->par, 0, sizeof (ds->par));
ds->par[RRD_DS_mrhb_cnt].lv = pdp_step * 2;
ds->par[RRD_DS_min_val].dv = NAN;
@@ -838,28 +838,60 @@ rspamd_rrd_update_cdp (struct rspamd_rrd_file *file, gdouble pdp_steps, gdouble
}
/**
+ * Update RRA in a file
+ * @param file rrd file
+ * @param rra_steps steps for each rra
+ * @param now current time
+ */
+void
+rspamd_rrd_write_rra (struct rspamd_rrd_file *file, gulong *rra_steps)
+{
+ guint i, j, scratch_idx, cdp_idx, k;
+ struct rrd_rra_def *rra;
+ gdouble *rra_row;
+
+ /* Iterate over all RRA */
+ for (i = 0; i < file->stat_head->rra_cnt; i ++) {
+ rra = &file->rra_def[i];
+ /* How much steps need to be updated */
+ for (j = 0, scratch_idx = CDP_primary_val; j < rra_steps[i]; j ++, scratch_idx = CDP_secondary_val) {
+ /* Move row ptr */
+ if (++file->rra_ptr[i].cur_row >= rra->row_cnt) {
+ file->rra_ptr[i].cur_row = 0;
+ }
+ /* Calculate seek */
+ rra_row = file->rrd_value + (file->stat_head->ds_cnt * i + file->rra_ptr[i].cur_row);
+ /* Iterate over DS */
+ for (k = 0; k < file->stat_head->ds_cnt; k ++) {
+ cdp_idx = i * file->stat_head->ds_cnt + k;
+ memcpy (rra_row, &file->cdp_prep[cdp_idx].scratch[scratch_idx].dv, sizeof (gdouble));
+ rra_row ++;
+ }
+ }
+ }
+}
+
+/**
* Add record to rrd file
* @param file rrd file object
- * @param rra_idx index of rra being added
* @param points points (must be row suitable for this RRA, depending on ds count)
* @param err error pointer
* @return TRUE if a row has been added
*/
gboolean
-rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err)
+rspamd_rrd_add_record (struct rspamd_rrd_file* file, GArray *points, GError **err)
{
- gdouble *row, interval, *pdp_new, *pdp_temp, pre_int, post_int;
+ gdouble interval, *pdp_new, *pdp_temp, pre_int, post_int;
guint i;
gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step,
prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset;
struct timeval tv;
- if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) != points->len || rra_idx >= file->stat_head->rra_cnt) {
+ if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) != points->len) {
g_set_error (err, rrd_error_quark (), EINVAL, "rrd add points failed: wrong arguments");
return FALSE;
}
- row = file->rrd_value;
/* Get interval */
gettimeofday (&tv, NULL);
interval = (gdouble)(tv.tv_sec - file->live_head->last_up) +
@@ -937,24 +969,15 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
}
/* Update this specific CDP */
rspamd_rrd_update_cdp (file, pdp_steps, pdp_offset, rra_steps, i, pdp_temp);
+ /* Write RRA */
+ rspamd_rrd_write_rra (file, rra_steps);
}
}
+ file->live_head->last_up = tv.tv_sec;
+ file->live_head->last_up_usec = tv.tv_usec;
- /* Skip unaffected rra */
- for (i = 0; i < rra_idx; i ++) {
- row += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
- }
-
- row += file->rra_ptr[rra_idx].cur_row * file->stat_head->ds_cnt;
-
- /* Increase row index */
- file->rra_ptr[rra_idx].cur_row ++;
- if (file->rra_ptr[rra_idx].cur_row >= file->rra_def[rra_idx].row_cnt) {
- file->rra_ptr[rra_idx].cur_row = 0;
- }
-
- /* Write data */
- memcpy (row, points->data, points->len);
+ /* Sync and invalidate */
+ msync (file->map, file->size, MS_ASYNC | MS_INVALIDATE);
g_free (pdp_new);
g_free (pdp_temp);
diff --git a/src/rrd.h b/src/rrd.h
index 158965caa..ff6902894 100644
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -32,7 +32,7 @@
*/
#define RRD_COOKIE "RRD"
-#define RRD_VERSION "0004"
+#define RRD_VERSION "0003"
#define RRD_FLOAT_COOKIE ((double)8.642135E130)
typedef union {
@@ -326,12 +326,11 @@ gboolean rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err);
/**
* Add record to rrd file
* @param file rrd file object
- * @param rra_idx index of rra being added
* @param points points (must be row suitable for this RRA, depending on ds count)
* @param err error pointer
* @return TRUE if a row has been added
*/
-gboolean rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err);
+gboolean rspamd_rrd_add_record (struct rspamd_rrd_file* file, GArray *points, GError **err);
/**
* Close rrd file
diff --git a/test/rspamd_rrd_test.c b/test/rspamd_rrd_test.c
index 04d523e6c..4d2347b1f 100644
--- a/test/rspamd_rrd_test.c
+++ b/test/rspamd_rrd_test.c
@@ -33,6 +33,7 @@ rspamd_rrd_test_func ()
struct rrd_rra_def rra;
struct rrd_ds_def ds;
GArray ar;
+ GError *err = NULL;
struct rspamd_rrd_file *rrd;
gint fd, i;
gdouble t;
@@ -40,36 +41,34 @@ rspamd_rrd_test_func ()
rspamd_snprintf (tmpfile, sizeof (tmpfile), "/tmp/rspamd_rrd.rrd");
/* Create sample rrd */
- g_assert ((rrd = rspamd_rrd_create (tmpfile, 1, 1, 5, NULL)) != NULL);
+ g_assert ((rrd = rspamd_rrd_create (tmpfile, 1, 1, 5, &err)) != NULL);
/* Add RRA */
- rspamd_strlcpy (rra.cf_nam, rrd_cf_to_string (RRD_CF_AVERAGE), sizeof (rra.cf_nam));
- rra.pdp_cnt = 1;
- rra.row_cnt = 100;
+ rrd_make_default_rra ("AVERAGE", 2, 100, &rra);
ar.data = &rra;
ar.len = sizeof (rra);
- g_assert (rspamd_rrd_add_rra (rrd, &ar, NULL));
+ g_assert (rspamd_rrd_add_rra (rrd, &ar, &err));
/* Add DS */
- rspamd_strlcpy (ds.dst, rrd_dst_to_string (RRD_DST_ABSOLUTE), sizeof (ds.dst));
- rspamd_strlcpy (ds.ds_nam, "test", sizeof (ds.ds_nam));
+ rrd_make_default_ds ("test", 1, &ds);
ar.data = &ds;
ar.len = sizeof (ds);
- g_assert (rspamd_rrd_add_ds (rrd, &ar, NULL));
+ g_assert (rspamd_rrd_add_ds (rrd, &ar, &err));
/* Finalize */
- g_assert (rspamd_rrd_finalize (rrd, NULL));
+ g_assert (rspamd_rrd_finalize (rrd, &err));
/* Close */
rspamd_rrd_close (rrd);
/* Reopen */
- g_assert ((rrd = rspamd_rrd_open (tmpfile, NULL)) != NULL);
-
+ g_assert ((rrd = rspamd_rrd_open (tmpfile, &err)) != NULL);
+#if 0
/* Add some points */
- for (i = 0; i < 200; i ++) {
+ for (i = 0; i < 10; i ++) {
t = i;
ar.data = &t;
ar.len = sizeof (gdouble);
- g_assert (rspamd_rrd_add_record (rrd, 0, &ar, NULL));
+ g_assert (rspamd_rrd_add_record (rrd, &ar, &err));
+ sleep (1);
}
-
+#endif
/* Finish */
rspamd_rrd_close (rrd);
/* unlink (tmpfile); */