]> source.dussan.org Git - rspamd.git/commitdiff
Implement rra writing.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 18 Dec 2012 18:12:45 +0000 (22:12 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 18 Dec 2012 18:12:45 +0000 (22:12 +0400)
src/rrd.c
src/rrd.h
test/rspamd_rrd_test.c

index 34d956a1603117385b768f23a3e384a80371f603..a971e500344f64f59c2d3d429ff8993f9c1ef465 100644 (file)
--- a/src/rrd.c
+++ b/src/rrd.c
@@ -139,7 +139,7 @@ void
 rrd_make_default_ds (const gchar *name, gulong pdp_step, struct rrd_ds_def *ds)
 {
        rspamd_strlcpy (ds->ds_nam, name, sizeof (ds->ds_nam));
-       rspamd_strlcpy (ds->dst, "AVERAGE", sizeof (ds->dst));
+       rspamd_strlcpy (ds->dst, "COUNTER", sizeof (ds->dst));
        memset (ds->par, 0, sizeof (ds->par));
        ds->par[RRD_DS_mrhb_cnt].lv = pdp_step * 2;
        ds->par[RRD_DS_min_val].dv = NAN;
@@ -837,29 +837,61 @@ rspamd_rrd_update_cdp (struct rspamd_rrd_file *file, gdouble pdp_steps, gdouble
        }
 }
 
+/**
+ * Update RRA in a file
+ * @param file rrd file
+ * @param rra_steps steps for each rra
+ * @param now current time
+ */
+void
+rspamd_rrd_write_rra (struct rspamd_rrd_file *file, gulong *rra_steps)
+{
+       guint                                                                    i, j, scratch_idx, cdp_idx, k;
+       struct rrd_rra_def                                              *rra;
+       gdouble                                                                 *rra_row;
+
+       /* Iterate over all RRA */
+       for (i = 0; i < file->stat_head->rra_cnt; i ++) {
+               rra = &file->rra_def[i];
+               /* How much steps need to be updated */
+               for (j = 0, scratch_idx = CDP_primary_val; j < rra_steps[i]; j ++, scratch_idx = CDP_secondary_val) {
+                       /* Move row ptr */
+                       if (++file->rra_ptr[i].cur_row >= rra->row_cnt) {
+                               file->rra_ptr[i].cur_row = 0;
+                       }
+                       /* Calculate seek */
+                       rra_row = file->rrd_value + (file->stat_head->ds_cnt * i + file->rra_ptr[i].cur_row);
+                       /* Iterate over DS */
+                       for (k = 0; k < file->stat_head->ds_cnt; k ++) {
+                               cdp_idx = i * file->stat_head->ds_cnt + k;
+                               memcpy (rra_row, &file->cdp_prep[cdp_idx].scratch[scratch_idx].dv, sizeof (gdouble));
+                               rra_row ++;
+                       }
+               }
+       }
+}
+
 /**
  * Add record to rrd file
  * @param file rrd file object
- * @param rra_idx index of rra being added
  * @param points points (must be row suitable for this RRA, depending on ds count)
  * @param err error pointer
  * @return TRUE if a row has been added
  */
 gboolean
-rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err)
+rspamd_rrd_add_record (struct rspamd_rrd_file* file, GArray *points, GError **err)
 {
-       gdouble                                                                 *row, interval, *pdp_new, *pdp_temp, pre_int, post_int;
+       gdouble                                                                  interval, *pdp_new, *pdp_temp, pre_int, post_int;
        guint                                                                    i;
        gulong                                                                   pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step,
                                                                                         prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset;
        struct timeval                                                   tv;
 
-       if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) != points->len || rra_idx >= file->stat_head->rra_cnt) {
+       if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) != points->len) {
                g_set_error (err, rrd_error_quark (), EINVAL, "rrd add points failed: wrong arguments");
                return FALSE;
        }
 
-       row = file->rrd_value;
        /* Get interval */
        gettimeofday (&tv, NULL);
        interval = (gdouble)(tv.tv_sec - file->live_head->last_up) +
@@ -937,24 +969,15 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
                        }
                        /* Update this specific CDP */
                        rspamd_rrd_update_cdp (file, pdp_steps, pdp_offset, rra_steps, i, pdp_temp);
+                       /* Write RRA */
+                       rspamd_rrd_write_rra (file, rra_steps);
                }
        }
+       file->live_head->last_up = tv.tv_sec;
+       file->live_head->last_up_usec = tv.tv_usec;
 
-       /* Skip unaffected rra */
-       for (i = 0; i < rra_idx; i ++) {
-               row += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
-       }
-
-       row += file->rra_ptr[rra_idx].cur_row * file->stat_head->ds_cnt;
-
-       /* Increase row index */
-       file->rra_ptr[rra_idx].cur_row ++;
-       if (file->rra_ptr[rra_idx].cur_row >= file->rra_def[rra_idx].row_cnt) {
-               file->rra_ptr[rra_idx].cur_row = 0;
-       }
-
-       /* Write data */
-       memcpy (row, points->data, points->len);
+       /* Sync and invalidate */
+       msync (file->map, file->size, MS_ASYNC | MS_INVALIDATE);
 
        g_free (pdp_new);
        g_free (pdp_temp);
index 158965caafff1342c71ec37329af4a6bcab12402..ff690289410bf84d69b6e3a73897594d1daa786c 100644 (file)
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -32,7 +32,7 @@
  */
 
 #define RRD_COOKIE    "RRD"
-#define RRD_VERSION   "0004"
+#define RRD_VERSION   "0003"
 #define RRD_FLOAT_COOKIE  ((double)8.642135E130)
 
 typedef union {
@@ -326,12 +326,11 @@ gboolean rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err);
 /**
  * Add record to rrd file
  * @param file rrd file object
- * @param rra_idx index of rra being added
  * @param points points (must be row suitable for this RRA, depending on ds count)
  * @param err error pointer
  * @return TRUE if a row has been added
  */
-gboolean rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err);
+gboolean rspamd_rrd_add_record (struct rspamd_rrd_file* file, GArray *points, GError **err);
 
 /**
  * Close rrd file
index 04d523e6cb609e1054a1514bac03101cd024c94f..4d2347b1f0d525fd310ba9b43302630685c9c2a1 100644 (file)
@@ -33,6 +33,7 @@ rspamd_rrd_test_func ()
        struct rrd_rra_def rra;
        struct rrd_ds_def ds;
        GArray ar;
+       GError *err = NULL;
        struct rspamd_rrd_file *rrd;
        gint fd, i;
        gdouble t;
@@ -40,36 +41,34 @@ rspamd_rrd_test_func ()
        rspamd_snprintf (tmpfile, sizeof (tmpfile), "/tmp/rspamd_rrd.rrd");
 
        /* Create sample rrd */
-       g_assert ((rrd = rspamd_rrd_create (tmpfile, 1, 1, 5, NULL)) != NULL);
+       g_assert ((rrd = rspamd_rrd_create (tmpfile, 1, 1, 5, &err)) != NULL);
        /* Add RRA */
-       rspamd_strlcpy (rra.cf_nam, rrd_cf_to_string (RRD_CF_AVERAGE), sizeof (rra.cf_nam));
-       rra.pdp_cnt = 1;
-       rra.row_cnt = 100;
+       rrd_make_default_rra ("AVERAGE", 2, 100, &rra);
        ar.data = &rra;
        ar.len = sizeof (rra);
-       g_assert (rspamd_rrd_add_rra (rrd, &ar, NULL));
+       g_assert (rspamd_rrd_add_rra (rrd, &ar, &err));
        /* Add DS */
-       rspamd_strlcpy (ds.dst, rrd_dst_to_string (RRD_DST_ABSOLUTE), sizeof (ds.dst));
-       rspamd_strlcpy (ds.ds_nam, "test", sizeof (ds.ds_nam));
+       rrd_make_default_ds ("test", 1, &ds);
        ar.data = &ds;
        ar.len = sizeof (ds);
-       g_assert (rspamd_rrd_add_ds (rrd, &ar, NULL));
+       g_assert (rspamd_rrd_add_ds (rrd, &ar, &err));
        /* Finalize */
-       g_assert (rspamd_rrd_finalize (rrd, NULL));
+       g_assert (rspamd_rrd_finalize (rrd, &err));
        /* Close */
        rspamd_rrd_close (rrd);
 
        /* Reopen */
-       g_assert ((rrd = rspamd_rrd_open (tmpfile, NULL)) != NULL);
-
+       g_assert ((rrd = rspamd_rrd_open (tmpfile, &err)) != NULL);
+#if 0
        /* Add some points */
-       for (i = 0; i < 200; i ++) {
+       for (i = 0; i < 10; i ++) {
                t = i;
                ar.data = &t;
                ar.len = sizeof (gdouble);
-               g_assert (rspamd_rrd_add_record (rrd, 0, &ar, NULL));
+               g_assert (rspamd_rrd_add_record (rrd, &ar, &err));
+               sleep (1);
        }
-
+#endif
        /* Finish */
        rspamd_rrd_close (rrd);
        /* unlink (tmpfile); */