aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2012-12-14 22:26:27 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2012-12-14 22:26:27 +0400
commitebd09dbe9e4059a6e67a01a0383bed874cdf4d21 (patch)
tree445120ae176a2d4a26a9bae88aa33544c66ed6d6
parent5fc591558a63e7ff592f3e9871dad411284e0cfe (diff)
downloadrspamd-ebd09dbe9e4059a6e67a01a0383bed874cdf4d21.tar.gz
rspamd-ebd09dbe9e4059a6e67a01a0383bed874cdf4d21.zip
* RRD API initial implementation.
-rw-r--r--src/rrd.c446
-rw-r--r--src/rrd.h38
2 files changed, 461 insertions, 23 deletions
diff --git a/src/rrd.c b/src/rrd.c
index b14369f9a..f24b4c3d5 100644
--- a/src/rrd.c
+++ b/src/rrd.c
@@ -24,6 +24,194 @@
#include "config.h"
#include "rrd.h"
+static GQuark
+rrd_error_quark (void)
+{
+ return g_quark_from_static_string ("rrd-error");
+}
+
+/**
+ * Check rrd file for correctness (size, cookies, etc)
+ */
+static gboolean
+rspamd_rrd_check_file (const gchar *filename, gboolean need_data, GError **err)
+{
+ gint fd, i;
+ struct stat st;
+ struct rrd_file_head head;
+ struct rrd_rra_def rra;
+ gint head_size;
+
+ fd = open (filename, O_RDWR);
+ if (fd == -1) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
+ return FALSE;
+ }
+
+ if (fstat (fd, &st) == -1) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
+ close (fd);
+ return FALSE;
+ }
+ if (st.st_size < (goffset)sizeof (struct rrd_file_head)) {
+ /* We have trimmed file */
+ g_set_error (err, rrd_error_quark (), EINVAL, "rrd size is bad: %ud", (guint)st.st_size);
+ close (fd);
+ return FALSE;
+ }
+
+ /* Try to read header */
+ if (read (fd, &head, sizeof (head)) != sizeof (head)) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd read head error: %s", strerror (errno));
+ close (fd);
+ return FALSE;
+ }
+ /* Check magic */
+ if (memcmp (head.cookie, RRD_COOKIE, sizeof (head.cookie)) != 0 ||
+ memcmp (head.version, RRD_VERSION, sizeof (head.version)) != 0 ||
+ head.float_cookie != RRD_FLOAT_COOKIE) {
+ g_set_error (err, rrd_error_quark (), EINVAL, "rrd head cookies error: %s", strerror (errno));
+ close (fd);
+ return FALSE;
+ }
+ /* Check for other params */
+ if (head.ds_cnt <= 0 || head.rra_cnt <= 0) {
+ g_set_error (err, rrd_error_quark (), EINVAL, "rrd head cookies error: %s", strerror (errno));
+ close (fd);
+ return FALSE;
+ }
+ /* Now we can calculate the overall size of rrd */
+ head_size = sizeof (struct rrd_file_head) +
+ sizeof (struct rrd_ds_def) * head.ds_cnt +
+ sizeof (struct rrd_rra_def) * head.rra_cnt +
+ sizeof (struct rrd_live_head) +
+ sizeof (struct rrd_pdp_prep) * head.ds_cnt +
+ sizeof (struct rrd_cdp_prep) * head.ds_cnt * head.rra_cnt +
+ sizeof (struct rrd_rra_ptr) * head.rra_cnt;
+ if (st.st_size < (goffset)head_size) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd file seems to have stripped header: %d", head_size);
+ close (fd);
+ return FALSE;
+ }
+
+ if (need_data) {
+ /* Now check rra */
+ if (lseek (fd, sizeof (struct rrd_ds_def) * head.ds_cnt, SEEK_CUR) == -1) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd head lseek error: %s", strerror (errno));
+ close (fd);
+ return FALSE;
+ }
+ for (i = 0; i < (gint)head.rra_cnt; i ++) {
+ if (read (fd, &rra, sizeof (rra)) != sizeof (rra)) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd read rra error: %s", strerror (errno));
+ close (fd);
+ return FALSE;
+ }
+ head_size += rra.row_cnt * head.ds_cnt;
+ }
+
+ if (st.st_size != head_size) {
+ g_set_error (err, rrd_error_quark (), EINVAL, "rrd file seems to have incorrect size: %d, must be %d", (gint)st.st_size, head_size);
+ close (fd);
+ return FALSE;
+ }
+ }
+
+ close (fd);
+ return TRUE;
+}
+
+/**
+ * Adjust pointers in mmapped rrd file
+ * @param file
+ */
+static void
+rspamd_rrd_adjust_pointers (struct rspamd_rrd_file *file, gboolean completed)
+{
+ guint8 *ptr;
+
+ ptr = file->map;
+ file->stat_head = (struct rrd_file_head *)ptr;
+ ptr += sizeof (struct rrd_file_head);
+ file->ds_def = (struct rrd_ds_def *)ptr;
+ ptr += sizeof (struct rrd_ds_def) * file->stat_head->ds_cnt;
+ file->rra_def = (struct rrd_rra_def *)ptr;
+ ptr += sizeof (struct rrd_rra_def) * file->stat_head->rra_cnt;
+ file->live_head = (struct rrd_live_head *)ptr;
+ ptr += sizeof (struct rrd_live_head);
+ file->pdp_prep = (struct rrd_pdp_prep *)ptr;
+ ptr += sizeof (struct rrd_pdp_prep) * file->stat_head->ds_cnt;
+ file->cdp_prep = (struct rrd_cdp_prep *)ptr;
+ ptr += sizeof (struct rrd_cdp_prep) * file->stat_head->rra_cnt * file->stat_head->ds_cnt;
+ file->rra_ptr = (struct rrd_rra_ptr *)ptr;
+ if (completed) {
+ ptr += sizeof (struct rrd_rra_ptr) * file->stat_head->rra_cnt;
+ file->rrd_value = (gdouble *)ptr;
+ }
+ else {
+ file->rrd_value = NULL;
+ }
+}
+
+/**
+ * Open completed or incompleted rrd file
+ * @param filename
+ * @param completed
+ * @param err
+ * @return
+ */
+static struct rspamd_rrd_file*
+rspamd_rrd_open_common (const gchar *filename, gboolean completed, GError **err)
+{
+ struct rspamd_rrd_file *new;
+ gint fd;
+ struct stat st;
+
+ if (!rspamd_rrd_check_file (filename, completed, err)) {
+ return NULL;
+ }
+
+ new = g_slice_alloc0 (sizeof (struct rspamd_rrd_file));
+
+ if (new == NULL) {
+ g_set_error (err, rrd_error_quark (), ENOMEM, "not enough memory");
+ return NULL;
+ }
+
+ /* Open file */
+ fd = open (filename, O_RDWR);
+ if (fd == -1) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
+ return FALSE;
+ }
+
+ if (fstat (fd, &st) == -1) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
+ close (fd);
+ return FALSE;
+ }
+ /* Mmap file */
+ new->size = st.st_size;
+ if ((new->map = mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) {
+ close (fd);
+ g_set_error (err, rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno));
+ g_slice_free1 (sizeof (struct rspamd_rrd_file), new);
+ return NULL;
+ }
+
+ close (fd);
+
+ /* Adjust pointers */
+ rspamd_rrd_adjust_pointers (new, completed);
+
+ /* Mark it as finalized */
+ new->finalized = completed;
+
+ new->filename = g_strdup (filename);
+
+ return new;
+}
+
/**
* Open (and mmap) existing RRD file
* @param filename path
@@ -33,7 +221,7 @@
struct rspamd_rrd_file*
rspamd_rrd_open (const gchar *filename, GError **err)
{
- return NULL;
+ return rspamd_rrd_open_common (filename, TRUE, err);
}
/**
@@ -45,10 +233,117 @@ rspamd_rrd_open (const gchar *filename, GError **err)
* @param err error pointer
* @return TRUE if file has been created
*/
-gboolean
+struct rspamd_rrd_file*
rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gulong pdp_step, GError **err)
{
- return FALSE;
+ struct rspamd_rrd_file *new;
+ struct rrd_file_head head;
+ struct rrd_ds_def ds;
+ struct rrd_rra_def rra;
+ struct rrd_live_head lh;
+ struct rrd_pdp_prep pdp;
+ struct rrd_cdp_prep cdp;
+ struct rrd_rra_ptr rra_ptr;
+ gint fd;
+ guint i, j;
+
+ /* Open file */
+ fd = open (filename, O_RDWR | O_CREAT | O_TRUNC, 0644);
+ if (fd == -1) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd create error: %s", strerror (errno));
+ return NULL;
+ }
+
+ /* Fill header */
+ memset (&head, 0, sizeof (head));
+ head.rra_cnt = rra_count;
+ head.ds_cnt = ds_count;
+ head.pdp_step = pdp_step;
+ memcpy (head.cookie, RRD_COOKIE, sizeof (head.cookie));
+ memcpy (head.version, RRD_VERSION, sizeof (head.version));
+ head.float_cookie = RRD_FLOAT_COOKIE;
+
+ if (write (fd, &head, sizeof (head)) != sizeof (head)) {
+ close (fd);
+ g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+ return NULL;
+ }
+
+ /* Fill DS section */
+ memset (&ds.ds_nam, 0, sizeof (ds.ds_nam));
+ memcpy (&ds.dst, "COUNTER", sizeof ("COUNTER"));
+ memset (&ds.par, 0, sizeof (ds.par));
+ for (i = 0; i < ds_count; i ++) {
+ if (write (fd, &ds, sizeof (ds)) != sizeof (ds)) {
+ close (fd);
+ g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+ return NULL;
+ }
+ }
+
+ /* Fill RRA section */
+ memcpy (&rra.cf_nam, "AVERAGE", sizeof ("AVERAGE"));
+ rra.pdp_cnt = 1;
+ memset (&rra.par, 0, sizeof (rra.par));
+ for (i = 0; i < rra_count; i ++) {
+ if (write (fd, &rra, sizeof (rra)) != sizeof (rra)) {
+ close (fd);
+ g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+ return NULL;
+ }
+ }
+
+ /* Fill live header */
+ lh.last_up = time (NULL) - 10;
+ lh.last_up_usec = 0;
+
+ if (write (fd, &lh, sizeof (lh)) != sizeof (lh)) {
+ close (fd);
+ g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+ return NULL;
+ }
+
+ /* Fill pdp prep */
+ memcpy (&pdp.last_ds, "U", sizeof ("U"));
+ memset (&pdp.scratch, 0, sizeof (pdp.scratch));
+ pdp.scratch[PDP_val].dv = 0.;
+ pdp.scratch[PDP_unkn_sec_cnt].lv = lh.last_up % pdp_step;
+ for (i = 0; i < ds_count; i ++) {
+ if (write (fd, &pdp, sizeof (pdp)) != sizeof (pdp)) {
+ close (fd);
+ g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+ return NULL;
+ }
+ }
+
+ /* Fill cdp prep */
+ memset (&cdp.scratch, 0, sizeof (cdp.scratch));
+ cdp.scratch[CDP_val].dv = NAN;
+ for (i = 0; i < rra_count; i ++) {
+ cdp.scratch[CDP_unkn_pdp_cnt].lv = ((lh.last_up - pdp.scratch[PDP_unkn_sec_cnt].lv) % (pdp_step * rra.pdp_cnt)) / pdp_step;
+ for (j = 0; j < ds_count; j ++) {
+ if (write (fd, &cdp, sizeof (cdp)) != sizeof (cdp)) {
+ close (fd);
+ g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+ return NULL;
+ }
+ }
+ }
+
+ /* Set row pointers */
+ memset (&rra_ptr, 0, sizeof (rra_ptr));
+ for (i = 0; i < rra_count; i ++) {
+ if (write (fd, &rra_ptr, sizeof (rra_ptr)) != sizeof (rra_ptr)) {
+ close (fd);
+ g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+ return NULL;
+ }
+ }
+
+ close (fd);
+ new = rspamd_rrd_open_common (filename, FALSE, err);
+
+ return new;
}
/**
@@ -59,9 +354,18 @@ rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gul
* @return TRUE if data sources were added
*/
gboolean
-rspamd_rrd_add_ds (const gchar *filename, GArray *ds, GError **err)
+rspamd_rrd_add_ds (struct rspamd_rrd_file *file, GArray *ds, GError **err)
{
- return FALSE;
+
+ if (file == NULL || file->stat_head->ds_cnt != ds->len * sizeof (struct rrd_ds_def)) {
+ g_set_error (err, rrd_error_quark (), EINVAL, "rrd add ds failed: wrong arguments");
+ return FALSE;
+ }
+
+ /* Straightforward memcpy */
+ memcpy (file->ds_def, ds->data, ds->len);
+
+ return TRUE;
}
/**
@@ -72,9 +376,17 @@ rspamd_rrd_add_ds (const gchar *filename, GArray *ds, GError **err)
* @return TRUE if archives were added
*/
gboolean
-rspamd_rrd_add_rra (const gchar *filename, GArray *rra, GError **err)
+rspamd_rrd_add_rra (struct rspamd_rrd_file *file, GArray *rra, GError **err)
{
- return FALSE;
+ if (file == NULL || file->stat_head->rra_cnt != rra->len * sizeof (struct rrd_rra_def)) {
+ g_set_error (err, rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments");
+ return FALSE;
+ }
+
+ /* Straightforward memcpy */
+ memcpy (file->rra_def, rra->data, rra->len);
+
+ return TRUE;
}
/**
@@ -84,9 +396,76 @@ rspamd_rrd_add_rra (const gchar *filename, GArray *rra, GError **err)
* @return TRUE if rrd file is ready for use
*/
gboolean
-rspamd_rrd_finalize (const gchar *filename, GError **err)
+rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err)
{
- return FALSE;
+ gint fd;
+ guint i, count = 0;
+ gdouble vbuf[1024];
+ struct stat st;
+
+ if (file == NULL || file->filename == NULL) {
+ g_set_error (err, rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments");
+ return FALSE;
+ }
+
+ fd = open (file->filename, O_RDWR);
+ if (fd == -1) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
+ return FALSE;
+ }
+
+ if (lseek (fd, 0, SEEK_END) == -1) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd seek error: %s", strerror (errno));
+ close (fd);
+ return FALSE;
+ }
+
+ /* Adjust CDP */
+ for (i = 0; i < file->stat_head->rra_cnt; i ++) {
+ file->cdp_prep->scratch[CDP_unkn_pdp_cnt].lv =
+ ((file->live_head->last_up - file->pdp_prep->scratch[PDP_unkn_sec_cnt].lv) % (file->stat_head->pdp_step *
+ file->rra_def[i].pdp_cnt)) / file->stat_head->pdp_step;
+ /* Randomize row pointer */
+ file->rra_ptr->cur_row = g_random_int () % file->rra_def[i].row_cnt;
+ /* Calculate values count */
+ count += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
+ }
+
+ munmap (file->map, file->size);
+ /* Write values */
+ for (i = 0; i < G_N_ELEMENTS (vbuf); i ++) {
+ vbuf[i] = NAN;
+ }
+
+ while (count > 0) {
+ /* Write values in buffered matter */
+ if (write (fd, vbuf, MIN (G_N_ELEMENTS (vbuf), count) * sizeof (gdouble)) == -1) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+ close (fd);
+ return FALSE;
+ }
+ count -= G_N_ELEMENTS (vbuf);
+ }
+
+ if (fstat (fd, &st) == -1) {
+ g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
+ close (fd);
+ return FALSE;
+ }
+
+ /* Mmap again */
+ file->size = st.st_size;
+ if ((file->map = mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) {
+ close (fd);
+ g_set_error (err, rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno));
+ g_slice_free1 (sizeof (struct rspamd_rrd_file), file);
+ return FALSE;
+ }
+ close (fd);
+ /* Adjust pointers */
+ rspamd_rrd_adjust_pointers (file, TRUE);
+
+ return TRUE;
}
/**
@@ -100,5 +479,52 @@ rspamd_rrd_finalize (const gchar *filename, GError **err)
gboolean
rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err)
{
- return FALSE;
+ gdouble *row;
+ guint i;
+
+ if (file == NULL || file->stat_head->ds_cnt != points->len * sizeof (gdouble) || rra_idx >= file->stat_head->rra_cnt) {
+ g_set_error (err, rrd_error_quark (), EINVAL, "rrd add points failed: wrong arguments");
+ return FALSE;
+ }
+
+ row = file->rrd_value;
+ /* Skip unaffected rra */
+ for (i = 0; i < rra_idx; i ++) {
+ row += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
+ }
+
+ row += file->rra_ptr[rra_idx].cur_row * file->stat_head->ds_cnt;
+
+ /* Increase row index */
+ file->rra_ptr[rra_idx].cur_row ++;
+ if (file->rra_ptr[rra_idx].cur_row >= file->rra_def[rra_idx].row_cnt) {
+ file->rra_ptr[rra_idx].cur_row = 0;
+ }
+
+ /* Write data */
+ memcpy (row, points, points->len);
+
+ return TRUE;
+}
+
+/**
+ * Close rrd file
+ * @param file
+ * @return
+ */
+gint
+rspamd_rrd_close (struct rspamd_rrd_file* file)
+{
+ if (file == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ munmap (file->map, file->size);
+ if (file->filename != NULL) {
+ g_free (file->filename);
+ }
+ g_slice_free1 (sizeof (struct rspamd_rrd_file), file);
+
+ return 0;
}
diff --git a/src/rrd.h b/src/rrd.h
index be22fa354..214f36d03 100644
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -260,15 +260,20 @@ struct rrd_rra_ptr {
/* Final rrd file structure */
struct rspamd_rrd_file {
- struct rrd_file_head *stat_head; /* the static header */
- struct rrd_ds_def *ds_def; /* list of data source definitions */
- struct rrd_rra_def *rra_def; /* list of round robin archive def */
- struct rrd_live_head *live_head; /* rrd v >= 3 last_up with us */
- struct rrd_pdp_prep *pdp_prep; /* pdp data prep area */
- struct rrd_cdp_prep *cdp_prep; /* cdp prep area */
- struct rrd_rra_ptr *rra_ptr; /* list of rra pointers */
- rrd_value_t *rrd_value; /* list of rrd values */
-} rrd_t;
+ struct rrd_file_head *stat_head; /* the static header */
+ struct rrd_ds_def *ds_def; /* list of data source definitions */
+ struct rrd_rra_def *rra_def; /* list of round robin archive def */
+ struct rrd_live_head *live_head; /* rrd v >= 3 last_up with us */
+ struct rrd_pdp_prep *pdp_prep; /* pdp data prep area */
+ struct rrd_cdp_prep *cdp_prep; /* cdp prep area */
+ struct rrd_rra_ptr *rra_ptr; /* list of rra pointers */
+ gdouble *rrd_value; /* list of rrd values */
+
+ gchar *filename;
+ guint8* map; /* mmapped area */
+ gsize size; /* its size */
+ gboolean finalized;
+};
/* Public API */
@@ -290,7 +295,7 @@ struct rspamd_rrd_file* rspamd_rrd_open (const gchar *filename, GError **err);
* @param err error pointer
* @return TRUE if file has been created
*/
-gboolean rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gulong pdp_step, GError **err);
+struct rspamd_rrd_file* rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gulong pdp_step, GError **err);
/**
* Add data sources to rrd file
@@ -299,7 +304,7 @@ gboolean rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_c
* @param err error pointer
* @return TRUE if data sources were added
*/
-gboolean rspamd_rrd_add_ds (const gchar *filename, GArray *ds, GError **err);
+gboolean rspamd_rrd_add_ds (struct rspamd_rrd_file* file, GArray *ds, GError **err);
/**
* Add round robin archives to rrd file
@@ -308,7 +313,7 @@ gboolean rspamd_rrd_add_ds (const gchar *filename, GArray *ds, GError **err);
* @param err error pointer
* @return TRUE if archives were added
*/
-gboolean rspamd_rrd_add_rra (const gchar *filename, GArray *rra, GError **err);
+gboolean rspamd_rrd_add_rra (struct rspamd_rrd_file *file, GArray *rra, GError **err);
/**
* Finalize rrd file header and initialize all RRA in the file
@@ -316,7 +321,7 @@ gboolean rspamd_rrd_add_rra (const gchar *filename, GArray *rra, GError **err);
* @param err error pointer
* @return TRUE if rrd file is ready for use
*/
-gboolean rspamd_rrd_finalize (const gchar *filename, GError **err);
+gboolean rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err);
/**
* Add record to rrd file
@@ -328,4 +333,11 @@ gboolean rspamd_rrd_finalize (const gchar *filename, GError **err);
*/
gboolean rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err);
+/**
+ * Close rrd file
+ * @param file
+ * @return
+ */
+gint rspamd_rrd_close (struct rspamd_rrd_file* file);
+
#endif /* RRD_H_ */