diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-12-14 22:26:27 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2012-12-14 22:26:27 +0400 |
commit | ebd09dbe9e4059a6e67a01a0383bed874cdf4d21 (patch) | |
tree | 445120ae176a2d4a26a9bae88aa33544c66ed6d6 | |
parent | 5fc591558a63e7ff592f3e9871dad411284e0cfe (diff) | |
download | rspamd-ebd09dbe9e4059a6e67a01a0383bed874cdf4d21.tar.gz rspamd-ebd09dbe9e4059a6e67a01a0383bed874cdf4d21.zip |
* RRD API initial implementation.
-rw-r--r-- | src/rrd.c | 446 | ||||
-rw-r--r-- | src/rrd.h | 38 |
2 files changed, 461 insertions, 23 deletions
@@ -24,6 +24,194 @@ #include "config.h" #include "rrd.h" +static GQuark +rrd_error_quark (void) +{ + return g_quark_from_static_string ("rrd-error"); +} + +/** + * Check rrd file for correctness (size, cookies, etc) + */ +static gboolean +rspamd_rrd_check_file (const gchar *filename, gboolean need_data, GError **err) +{ + gint fd, i; + struct stat st; + struct rrd_file_head head; + struct rrd_rra_def rra; + gint head_size; + + fd = open (filename, O_RDWR); + if (fd == -1) { + g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno)); + return FALSE; + } + + if (fstat (fd, &st) == -1) { + g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno)); + close (fd); + return FALSE; + } + if (st.st_size < (goffset)sizeof (struct rrd_file_head)) { + /* We have trimmed file */ + g_set_error (err, rrd_error_quark (), EINVAL, "rrd size is bad: %ud", (guint)st.st_size); + close (fd); + return FALSE; + } + + /* Try to read header */ + if (read (fd, &head, sizeof (head)) != sizeof (head)) { + g_set_error (err, rrd_error_quark (), errno, "rrd read head error: %s", strerror (errno)); + close (fd); + return FALSE; + } + /* Check magic */ + if (memcmp (head.cookie, RRD_COOKIE, sizeof (head.cookie)) != 0 || + memcmp (head.version, RRD_VERSION, sizeof (head.version)) != 0 || + head.float_cookie != RRD_FLOAT_COOKIE) { + g_set_error (err, rrd_error_quark (), EINVAL, "rrd head cookies error: %s", strerror (errno)); + close (fd); + return FALSE; + } + /* Check for other params */ + if (head.ds_cnt <= 0 || head.rra_cnt <= 0) { + g_set_error (err, rrd_error_quark (), EINVAL, "rrd head cookies error: %s", strerror (errno)); + close (fd); + return FALSE; + } + /* Now we can calculate the overall size of rrd */ + head_size = sizeof (struct rrd_file_head) + + sizeof (struct rrd_ds_def) * head.ds_cnt + + sizeof (struct rrd_rra_def) * head.rra_cnt + + sizeof (struct rrd_live_head) + + sizeof (struct rrd_pdp_prep) * head.ds_cnt + + sizeof (struct rrd_cdp_prep) * head.ds_cnt * head.rra_cnt + + sizeof (struct rrd_rra_ptr) * head.rra_cnt; + if (st.st_size < (goffset)head_size) { + g_set_error (err, rrd_error_quark (), errno, "rrd file seems to have stripped header: %d", head_size); + close (fd); + return FALSE; + } + + if (need_data) { + /* Now check rra */ + if (lseek (fd, sizeof (struct rrd_ds_def) * head.ds_cnt, SEEK_CUR) == -1) { + g_set_error (err, rrd_error_quark (), errno, "rrd head lseek error: %s", strerror (errno)); + close (fd); + return FALSE; + } + for (i = 0; i < (gint)head.rra_cnt; i ++) { + if (read (fd, &rra, sizeof (rra)) != sizeof (rra)) { + g_set_error (err, rrd_error_quark (), errno, "rrd read rra error: %s", strerror (errno)); + close (fd); + return FALSE; + } + head_size += rra.row_cnt * head.ds_cnt; + } + + if (st.st_size != head_size) { + g_set_error (err, rrd_error_quark (), EINVAL, "rrd file seems to have incorrect size: %d, must be %d", (gint)st.st_size, head_size); + close (fd); + return FALSE; + } + } + + close (fd); + return TRUE; +} + +/** + * Adjust pointers in mmapped rrd file + * @param file + */ +static void +rspamd_rrd_adjust_pointers (struct rspamd_rrd_file *file, gboolean completed) +{ + guint8 *ptr; + + ptr = file->map; + file->stat_head = (struct rrd_file_head *)ptr; + ptr += sizeof (struct rrd_file_head); + file->ds_def = (struct rrd_ds_def *)ptr; + ptr += sizeof (struct rrd_ds_def) * file->stat_head->ds_cnt; + file->rra_def = (struct rrd_rra_def *)ptr; + ptr += sizeof (struct rrd_rra_def) * file->stat_head->rra_cnt; + file->live_head = (struct rrd_live_head *)ptr; + ptr += sizeof (struct rrd_live_head); + file->pdp_prep = (struct rrd_pdp_prep *)ptr; + ptr += sizeof (struct rrd_pdp_prep) * file->stat_head->ds_cnt; + file->cdp_prep = (struct rrd_cdp_prep *)ptr; + ptr += sizeof (struct rrd_cdp_prep) * file->stat_head->rra_cnt * file->stat_head->ds_cnt; + file->rra_ptr = (struct rrd_rra_ptr *)ptr; + if (completed) { + ptr += sizeof (struct rrd_rra_ptr) * file->stat_head->rra_cnt; + file->rrd_value = (gdouble *)ptr; + } + else { + file->rrd_value = NULL; + } +} + +/** + * Open completed or incompleted rrd file + * @param filename + * @param completed + * @param err + * @return + */ +static struct rspamd_rrd_file* +rspamd_rrd_open_common (const gchar *filename, gboolean completed, GError **err) +{ + struct rspamd_rrd_file *new; + gint fd; + struct stat st; + + if (!rspamd_rrd_check_file (filename, completed, err)) { + return NULL; + } + + new = g_slice_alloc0 (sizeof (struct rspamd_rrd_file)); + + if (new == NULL) { + g_set_error (err, rrd_error_quark (), ENOMEM, "not enough memory"); + return NULL; + } + + /* Open file */ + fd = open (filename, O_RDWR); + if (fd == -1) { + g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno)); + return FALSE; + } + + if (fstat (fd, &st) == -1) { + g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno)); + close (fd); + return FALSE; + } + /* Mmap file */ + new->size = st.st_size; + if ((new->map = mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) { + close (fd); + g_set_error (err, rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno)); + g_slice_free1 (sizeof (struct rspamd_rrd_file), new); + return NULL; + } + + close (fd); + + /* Adjust pointers */ + rspamd_rrd_adjust_pointers (new, completed); + + /* Mark it as finalized */ + new->finalized = completed; + + new->filename = g_strdup (filename); + + return new; +} + /** * Open (and mmap) existing RRD file * @param filename path @@ -33,7 +221,7 @@ struct rspamd_rrd_file* rspamd_rrd_open (const gchar *filename, GError **err) { - return NULL; + return rspamd_rrd_open_common (filename, TRUE, err); } /** @@ -45,10 +233,117 @@ rspamd_rrd_open (const gchar *filename, GError **err) * @param err error pointer * @return TRUE if file has been created */ -gboolean +struct rspamd_rrd_file* rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gulong pdp_step, GError **err) { - return FALSE; + struct rspamd_rrd_file *new; + struct rrd_file_head head; + struct rrd_ds_def ds; + struct rrd_rra_def rra; + struct rrd_live_head lh; + struct rrd_pdp_prep pdp; + struct rrd_cdp_prep cdp; + struct rrd_rra_ptr rra_ptr; + gint fd; + guint i, j; + + /* Open file */ + fd = open (filename, O_RDWR | O_CREAT | O_TRUNC, 0644); + if (fd == -1) { + g_set_error (err, rrd_error_quark (), errno, "rrd create error: %s", strerror (errno)); + return NULL; + } + + /* Fill header */ + memset (&head, 0, sizeof (head)); + head.rra_cnt = rra_count; + head.ds_cnt = ds_count; + head.pdp_step = pdp_step; + memcpy (head.cookie, RRD_COOKIE, sizeof (head.cookie)); + memcpy (head.version, RRD_VERSION, sizeof (head.version)); + head.float_cookie = RRD_FLOAT_COOKIE; + + if (write (fd, &head, sizeof (head)) != sizeof (head)) { + close (fd); + g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno)); + return NULL; + } + + /* Fill DS section */ + memset (&ds.ds_nam, 0, sizeof (ds.ds_nam)); + memcpy (&ds.dst, "COUNTER", sizeof ("COUNTER")); + memset (&ds.par, 0, sizeof (ds.par)); + for (i = 0; i < ds_count; i ++) { + if (write (fd, &ds, sizeof (ds)) != sizeof (ds)) { + close (fd); + g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno)); + return NULL; + } + } + + /* Fill RRA section */ + memcpy (&rra.cf_nam, "AVERAGE", sizeof ("AVERAGE")); + rra.pdp_cnt = 1; + memset (&rra.par, 0, sizeof (rra.par)); + for (i = 0; i < rra_count; i ++) { + if (write (fd, &rra, sizeof (rra)) != sizeof (rra)) { + close (fd); + g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno)); + return NULL; + } + } + + /* Fill live header */ + lh.last_up = time (NULL) - 10; + lh.last_up_usec = 0; + + if (write (fd, &lh, sizeof (lh)) != sizeof (lh)) { + close (fd); + g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno)); + return NULL; + } + + /* Fill pdp prep */ + memcpy (&pdp.last_ds, "U", sizeof ("U")); + memset (&pdp.scratch, 0, sizeof (pdp.scratch)); + pdp.scratch[PDP_val].dv = 0.; + pdp.scratch[PDP_unkn_sec_cnt].lv = lh.last_up % pdp_step; + for (i = 0; i < ds_count; i ++) { + if (write (fd, &pdp, sizeof (pdp)) != sizeof (pdp)) { + close (fd); + g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno)); + return NULL; + } + } + + /* Fill cdp prep */ + memset (&cdp.scratch, 0, sizeof (cdp.scratch)); + cdp.scratch[CDP_val].dv = NAN; + for (i = 0; i < rra_count; i ++) { + cdp.scratch[CDP_unkn_pdp_cnt].lv = ((lh.last_up - pdp.scratch[PDP_unkn_sec_cnt].lv) % (pdp_step * rra.pdp_cnt)) / pdp_step; + for (j = 0; j < ds_count; j ++) { + if (write (fd, &cdp, sizeof (cdp)) != sizeof (cdp)) { + close (fd); + g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno)); + return NULL; + } + } + } + + /* Set row pointers */ + memset (&rra_ptr, 0, sizeof (rra_ptr)); + for (i = 0; i < rra_count; i ++) { + if (write (fd, &rra_ptr, sizeof (rra_ptr)) != sizeof (rra_ptr)) { + close (fd); + g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno)); + return NULL; + } + } + + close (fd); + new = rspamd_rrd_open_common (filename, FALSE, err); + + return new; } /** @@ -59,9 +354,18 @@ rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gul * @return TRUE if data sources were added */ gboolean -rspamd_rrd_add_ds (const gchar *filename, GArray *ds, GError **err) +rspamd_rrd_add_ds (struct rspamd_rrd_file *file, GArray *ds, GError **err) { - return FALSE; + + if (file == NULL || file->stat_head->ds_cnt != ds->len * sizeof (struct rrd_ds_def)) { + g_set_error (err, rrd_error_quark (), EINVAL, "rrd add ds failed: wrong arguments"); + return FALSE; + } + + /* Straightforward memcpy */ + memcpy (file->ds_def, ds->data, ds->len); + + return TRUE; } /** @@ -72,9 +376,17 @@ rspamd_rrd_add_ds (const gchar *filename, GArray *ds, GError **err) * @return TRUE if archives were added */ gboolean -rspamd_rrd_add_rra (const gchar *filename, GArray *rra, GError **err) +rspamd_rrd_add_rra (struct rspamd_rrd_file *file, GArray *rra, GError **err) { - return FALSE; + if (file == NULL || file->stat_head->rra_cnt != rra->len * sizeof (struct rrd_rra_def)) { + g_set_error (err, rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments"); + return FALSE; + } + + /* Straightforward memcpy */ + memcpy (file->rra_def, rra->data, rra->len); + + return TRUE; } /** @@ -84,9 +396,76 @@ rspamd_rrd_add_rra (const gchar *filename, GArray *rra, GError **err) * @return TRUE if rrd file is ready for use */ gboolean -rspamd_rrd_finalize (const gchar *filename, GError **err) +rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err) { - return FALSE; + gint fd; + guint i, count = 0; + gdouble vbuf[1024]; + struct stat st; + + if (file == NULL || file->filename == NULL) { + g_set_error (err, rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments"); + return FALSE; + } + + fd = open (file->filename, O_RDWR); + if (fd == -1) { + g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno)); + return FALSE; + } + + if (lseek (fd, 0, SEEK_END) == -1) { + g_set_error (err, rrd_error_quark (), errno, "rrd seek error: %s", strerror (errno)); + close (fd); + return FALSE; + } + + /* Adjust CDP */ + for (i = 0; i < file->stat_head->rra_cnt; i ++) { + file->cdp_prep->scratch[CDP_unkn_pdp_cnt].lv = + ((file->live_head->last_up - file->pdp_prep->scratch[PDP_unkn_sec_cnt].lv) % (file->stat_head->pdp_step * + file->rra_def[i].pdp_cnt)) / file->stat_head->pdp_step; + /* Randomize row pointer */ + file->rra_ptr->cur_row = g_random_int () % file->rra_def[i].row_cnt; + /* Calculate values count */ + count += file->rra_def[i].row_cnt * file->stat_head->ds_cnt; + } + + munmap (file->map, file->size); + /* Write values */ + for (i = 0; i < G_N_ELEMENTS (vbuf); i ++) { + vbuf[i] = NAN; + } + + while (count > 0) { + /* Write values in buffered matter */ + if (write (fd, vbuf, MIN (G_N_ELEMENTS (vbuf), count) * sizeof (gdouble)) == -1) { + g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno)); + close (fd); + return FALSE; + } + count -= G_N_ELEMENTS (vbuf); + } + + if (fstat (fd, &st) == -1) { + g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno)); + close (fd); + return FALSE; + } + + /* Mmap again */ + file->size = st.st_size; + if ((file->map = mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) { + close (fd); + g_set_error (err, rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno)); + g_slice_free1 (sizeof (struct rspamd_rrd_file), file); + return FALSE; + } + close (fd); + /* Adjust pointers */ + rspamd_rrd_adjust_pointers (file, TRUE); + + return TRUE; } /** @@ -100,5 +479,52 @@ rspamd_rrd_finalize (const gchar *filename, GError **err) gboolean rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err) { - return FALSE; + gdouble *row; + guint i; + + if (file == NULL || file->stat_head->ds_cnt != points->len * sizeof (gdouble) || rra_idx >= file->stat_head->rra_cnt) { + g_set_error (err, rrd_error_quark (), EINVAL, "rrd add points failed: wrong arguments"); + return FALSE; + } + + row = file->rrd_value; + /* Skip unaffected rra */ + for (i = 0; i < rra_idx; i ++) { + row += file->rra_def[i].row_cnt * file->stat_head->ds_cnt; + } + + row += file->rra_ptr[rra_idx].cur_row * file->stat_head->ds_cnt; + + /* Increase row index */ + file->rra_ptr[rra_idx].cur_row ++; + if (file->rra_ptr[rra_idx].cur_row >= file->rra_def[rra_idx].row_cnt) { + file->rra_ptr[rra_idx].cur_row = 0; + } + + /* Write data */ + memcpy (row, points, points->len); + + return TRUE; +} + +/** + * Close rrd file + * @param file + * @return + */ +gint +rspamd_rrd_close (struct rspamd_rrd_file* file) +{ + if (file == NULL) { + errno = EINVAL; + return -1; + } + + munmap (file->map, file->size); + if (file->filename != NULL) { + g_free (file->filename); + } + g_slice_free1 (sizeof (struct rspamd_rrd_file), file); + + return 0; } @@ -260,15 +260,20 @@ struct rrd_rra_ptr { /* Final rrd file structure */ struct rspamd_rrd_file { - struct rrd_file_head *stat_head; /* the static header */ - struct rrd_ds_def *ds_def; /* list of data source definitions */ - struct rrd_rra_def *rra_def; /* list of round robin archive def */ - struct rrd_live_head *live_head; /* rrd v >= 3 last_up with us */ - struct rrd_pdp_prep *pdp_prep; /* pdp data prep area */ - struct rrd_cdp_prep *cdp_prep; /* cdp prep area */ - struct rrd_rra_ptr *rra_ptr; /* list of rra pointers */ - rrd_value_t *rrd_value; /* list of rrd values */ -} rrd_t; + struct rrd_file_head *stat_head; /* the static header */ + struct rrd_ds_def *ds_def; /* list of data source definitions */ + struct rrd_rra_def *rra_def; /* list of round robin archive def */ + struct rrd_live_head *live_head; /* rrd v >= 3 last_up with us */ + struct rrd_pdp_prep *pdp_prep; /* pdp data prep area */ + struct rrd_cdp_prep *cdp_prep; /* cdp prep area */ + struct rrd_rra_ptr *rra_ptr; /* list of rra pointers */ + gdouble *rrd_value; /* list of rrd values */ + + gchar *filename; + guint8* map; /* mmapped area */ + gsize size; /* its size */ + gboolean finalized; +}; /* Public API */ @@ -290,7 +295,7 @@ struct rspamd_rrd_file* rspamd_rrd_open (const gchar *filename, GError **err); * @param err error pointer * @return TRUE if file has been created */ -gboolean rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gulong pdp_step, GError **err); +struct rspamd_rrd_file* rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gulong pdp_step, GError **err); /** * Add data sources to rrd file @@ -299,7 +304,7 @@ gboolean rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_c * @param err error pointer * @return TRUE if data sources were added */ -gboolean rspamd_rrd_add_ds (const gchar *filename, GArray *ds, GError **err); +gboolean rspamd_rrd_add_ds (struct rspamd_rrd_file* file, GArray *ds, GError **err); /** * Add round robin archives to rrd file @@ -308,7 +313,7 @@ gboolean rspamd_rrd_add_ds (const gchar *filename, GArray *ds, GError **err); * @param err error pointer * @return TRUE if archives were added */ -gboolean rspamd_rrd_add_rra (const gchar *filename, GArray *rra, GError **err); +gboolean rspamd_rrd_add_rra (struct rspamd_rrd_file *file, GArray *rra, GError **err); /** * Finalize rrd file header and initialize all RRA in the file @@ -316,7 +321,7 @@ gboolean rspamd_rrd_add_rra (const gchar *filename, GArray *rra, GError **err); * @param err error pointer * @return TRUE if rrd file is ready for use */ -gboolean rspamd_rrd_finalize (const gchar *filename, GError **err); +gboolean rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err); /** * Add record to rrd file @@ -328,4 +333,11 @@ gboolean rspamd_rrd_finalize (const gchar *filename, GError **err); */ gboolean rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err); +/** + * Close rrd file + * @param file + * @return + */ +gint rspamd_rrd_close (struct rspamd_rrd_file* file); + #endif /* RRD_H_ */ |