#include "config.h"
#include "rrd.h"
+#include "util.h"
static GQuark
rrd_error_quark (void)
return "U";
}
+void
+rrd_make_default_rra (const gchar *cf_name, gulong pdp_cnt, gulong rows, struct rrd_rra_def *rra)
+{
+ rra->pdp_cnt = pdp_cnt;
+ rra->row_cnt = rows;
+ rspamd_strlcpy (rra->cf_nam, cf_name, sizeof (rra->cf_nam));
+ memset (rra->par, 0, sizeof (rra->par));
+ rra->par[RRA_cdp_xff_val].dv = 0.5;
+}
+
+void
+rrd_make_default_ds (const gchar *name, gulong pdp_step, struct rrd_ds_def *ds)
+{
+ rspamd_strlcpy (ds->ds_nam, name, sizeof (ds->ds_nam));
+ rspamd_strlcpy (ds->dst, "AVERAGE", sizeof (ds->dst));
+ memset (ds->par, 0, sizeof (ds->par));
+ ds->par[RRD_DS_mrhb_cnt].lv = pdp_step * 2;
+ ds->par[RRD_DS_min_val].dv = NAN;
+ ds->par[RRD_DS_max_val].dv = NAN;
+}
+
/**
* Check rrd file for correctness (size, cookies, etc)
*/
return TRUE;
}
+static gboolean
+rspamd_rrd_update_pdp_prep (struct rspamd_rrd_file *file, gdouble *vals, gdouble *pdp_new, gdouble interval)
+{
+ guint i;
+ enum rrd_dst_type type;
+
+ for (i = 0; i < file->stat_head->ds_cnt; i ++) {
+ type = rrd_dst_from_string (file->ds_def[i].ds_nam);
+
+ if (file->ds_def[i].par[RRD_DS_mrhb_cnt].lv < interval) {
+ rspamd_strlcpy (file->pdp_prep[i].last_ds, "U", sizeof (file->pdp_prep[i].last_ds));
+ }
+
+ if (file->ds_def[i].par[RRD_DS_mrhb_cnt].lv >= interval) {
+ switch (type) {
+ case RRD_DST_COUNTER:
+ case RRD_DST_DERIVE:
+ if (file->pdp_prep[i].last_ds[0] == 'U') {
+ pdp_new[i] = NAN;
+ }
+ else {
+ pdp_new[i] = vals[i] - strtod (file->pdp_prep[i].last_ds, NULL);
+ }
+ break;
+ case RRD_DST_GAUGE:
+ pdp_new[i] = vals[i] * interval;
+ break;
+ case RRD_DST_ABSOLUTE:
+ pdp_new[i] = vals[i];
+ break;
+ default:
+ return FALSE;
+ }
+ }
+ else {
+ pdp_new[i] = NAN;
+ }
+ /* Copy value to the last_ds */
+ if (!isnan (vals[i])) {
+ rspamd_snprintf (file->pdp_prep[i].last_ds, sizeof (file->pdp_prep[i].last_ds), "%.4f", vals[i]);
+ }
+ else {
+ file->pdp_prep[i].last_ds[0] = 'U';
+ file->pdp_prep[i].last_ds[1] = '\0';
+ }
+ }
+
+
+ return TRUE;
+}
+
+static void
+rspamd_rrd_update_pdp_step (struct rspamd_rrd_file *file, gdouble *pdp_new, gdouble *pdp_temp, gdouble interval,
+ gdouble pre_int, gdouble post_int, gulong pdp_diff)
+{
+ guint i;
+ rrd_value_t *scratch;
+ gulong heartbeat;
+
+
+ for (i = 0; i < file->stat_head->ds_cnt; i ++) {
+ scratch = file->pdp_prep[i].scratch;
+ heartbeat = file->ds_def[i].par[RRD_DS_mrhb_cnt].lv;
+ if (!isnan (pdp_new[i])) {
+ if (isnan (scratch[PDP_val].dv)) {
+ scratch[PDP_val].dv = 0;
+ }
+ scratch[PDP_val].dv += pdp_new[i] / interval * pre_int;
+ }
+ /* Check interval value for heartbeat for this DS */
+ if ((interval > heartbeat) || (file->stat_head->pdp_step / 2.0 < scratch[PDP_unkn_sec_cnt].lv)) {
+ pdp_temp[i] = NAN;
+ }
+ else {
+ pdp_temp[i] = scratch[PDP_val].dv /
+ ((double) (pdp_diff - scratch[PDP_unkn_sec_cnt].lv) - pre_int);
+ }
+
+ if (isnan (pdp_new[i])) {
+ scratch[PDP_unkn_sec_cnt].lv = floor (post_int);
+ scratch[PDP_val].dv = NAN;
+ } else {
+ scratch[PDP_unkn_sec_cnt].lv = 0;
+ scratch[PDP_val].dv = pdp_new[i] / interval * post_int;
+ }
+ }
+}
+
/**
* Add record to rrd file
* @param file rrd file object
gboolean
rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err)
{
- gdouble *row;
+ gdouble *row, interval, *pdp_new, *pdp_temp, pre_int, post_int;
guint i;
+ gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step,
+ prev_pdp_age, cur_pdp_age;
+ struct timeval tv;
if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) != points->len || rra_idx >= file->stat_head->rra_cnt) {
g_set_error (err, rrd_error_quark (), EINVAL, "rrd add points failed: wrong arguments");
}
row = file->rrd_value;
+ /* Get interval */
+ gettimeofday (&tv, NULL);
+ interval = (gdouble)(tv.tv_sec - file->live_head->last_up) +
+ (gdouble)(tv.tv_usec - file->live_head->last_up_usec) / 1e6f;
+
+ /* Update PDP preparation values */
+ pdp_new = g_malloc (sizeof (gdouble) * file->stat_head->ds_cnt);
+ pdp_temp = g_malloc (sizeof (gdouble) * file->stat_head->ds_cnt);
+
+ if (!rspamd_rrd_update_pdp_prep (file, (gdouble *)points->data, pdp_new, interval)) {
+ g_set_error (err, rrd_error_quark (), EINVAL, "rrd update pdp failed: wrong arguments");
+ g_free (pdp_new);
+ g_free (pdp_temp);
+ return FALSE;
+ }
+
+ /* Calculate elapsed steps */
+ prev_pdp_age = file->live_head->last_up % file->stat_head->pdp_step;
+ prev_pdp_step = file->live_head->last_up - prev_pdp_age;
+ cur_pdp_age = tv.tv_sec % file->stat_head->pdp_step;
+ cur_pdp_step = tv.tv_sec - cur_pdp_age;
+
+ if (cur_pdp_step > prev_pdp_step) {
+ pre_int = (cur_pdp_step - file->live_head->last_up) - ((double)file->live_head->last_up_usec) / 1e6f;
+ post_int = cur_pdp_age + ((double)tv.tv_usec) / 1e6f;
+
+ }
+ else {
+ pre_int = interval;
+ post_int = 0;
+ }
+ cur_pdp_count = cur_pdp_step / file->stat_head->pdp_step;
+ pdp_steps = (cur_pdp_step - prev_pdp_step) / file->stat_head->pdp_step;
+
+
+ if (pdp_steps == 0) {
+ /* Simple update of pdp prep */
+ for (i = 0; i < file->stat_head->ds_cnt; i ++) {
+ if (isnan (pdp_new[i])) {
+ /* Increment unknown period */
+ file->pdp_prep[i].scratch[PDP_unkn_sec_cnt].lv += floor (interval);
+ }
+ else {
+ if (isnan (file->pdp_prep[i].scratch[PDP_val].dv)) {
+ /* Reset pdp to the current value */
+ file->pdp_prep[i].scratch[PDP_val].dv = pdp_new[i];
+ }
+ else {
+ /* Increment pdp value */
+ file->pdp_prep[i].scratch[PDP_val].dv += pdp_new[i];
+ }
+ }
+ }
+ }
+ else {
+ /* Complex update of PDP, CDP and RRA */
+
+ /* Update PDP for this step */
+ rspamd_rrd_update_pdp_step (file, pdp_new, pdp_temp, interval, pre_int, post_int, pdp_steps * file->stat_head->pdp_step);
+
+ }
+
/* Skip unaffected rra */
for (i = 0; i < rra_idx; i ++) {
row += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
/* Write data */
memcpy (row, points->data, points->len);
+ g_free (pdp_new);
+ g_free (pdp_temp);
+
return TRUE;
}