/* * Copyright 2024 Vsevolod Stakhov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "config.h" #include "rrd.h" #include "util.h" #include "cfg_file.h" #include "logger.h" #include "unix-std.h" #include "cryptobox.h" #include #define RSPAMD_RRD_DS_COUNT METRIC_ACTION_MAX #define RSPAMD_RRD_OLD_DS_COUNT 4 #define RSPAMD_RRD_RRA_COUNT 4 #define msg_err_rrd(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \ "rrd", file->id, \ G_STRFUNC, \ __VA_ARGS__) #define msg_warn_rrd(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \ "rrd", file->id, \ G_STRFUNC, \ __VA_ARGS__) #define msg_info_rrd(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \ "rrd", file->id, \ G_STRFUNC, \ __VA_ARGS__) #define msg_debug_rrd(...) rspamd_conditional_debug_fast(NULL, NULL, \ rspamd_rrd_log_id, "rrd", file->id, \ G_STRFUNC, \ __VA_ARGS__) INIT_LOG_MODULE(rrd) static GQuark rrd_error_quark(void) { return g_quark_from_static_string("rrd-error"); } /** * Convert rrd dst type from string to numeric value */ enum rrd_dst_type rrd_dst_from_string(const char *str) { if (g_ascii_strcasecmp(str, "counter") == 0) { return RRD_DST_COUNTER; } else if (g_ascii_strcasecmp(str, "absolute") == 0) { return RRD_DST_ABSOLUTE; } else if (g_ascii_strcasecmp(str, "gauge") == 0) { return RRD_DST_GAUGE; } else if (g_ascii_strcasecmp(str, "cdef") == 0) { return RRD_DST_CDEF; } else if (g_ascii_strcasecmp(str, "derive") == 0) { return RRD_DST_DERIVE; } return RRD_DST_INVALID; } /** * Convert numeric presentation of dst to string */ const char * rrd_dst_to_string(enum rrd_dst_type type) { switch (type) { case RRD_DST_COUNTER: return "COUNTER"; case RRD_DST_ABSOLUTE: return "ABSOLUTE"; case RRD_DST_GAUGE: return "GAUGE"; case RRD_DST_CDEF: return "CDEF"; case RRD_DST_DERIVE: return "DERIVE"; default: return "U"; } return "U"; } /** * Convert rrd consolidation function type from string to numeric value */ enum rrd_cf_type rrd_cf_from_string(const char *str) { if (g_ascii_strcasecmp(str, "average") == 0) { return RRD_CF_AVERAGE; } else if (g_ascii_strcasecmp(str, "minimum") == 0) { return RRD_CF_MINIMUM; } else if (g_ascii_strcasecmp(str, "maximum") == 0) { return RRD_CF_MAXIMUM; } else if (g_ascii_strcasecmp(str, "last") == 0) { return RRD_CF_LAST; } /* XXX: add other CF functions supported by rrd */ return RRD_CF_INVALID; } /** * Convert numeric presentation of cf to string */ const char * rrd_cf_to_string(enum rrd_cf_type type) { switch (type) { case RRD_CF_AVERAGE: return "AVERAGE"; case RRD_CF_MINIMUM: return "MINIMUM"; case RRD_CF_MAXIMUM: return "MAXIMUM"; case RRD_CF_LAST: return "LAST"; default: return "U"; } /* XXX: add other CF functions supported by rrd */ return "U"; } void rrd_make_default_rra(const char *cf_name, gulong pdp_cnt, gulong rows, struct rrd_rra_def *rra) { g_assert(cf_name != NULL); g_assert(rrd_cf_from_string(cf_name) != RRD_CF_INVALID); rra->pdp_cnt = pdp_cnt; rra->row_cnt = rows; rspamd_strlcpy(rra->cf_nam, cf_name, sizeof(rra->cf_nam)); memset(rra->par, 0, sizeof(rra->par)); rra->par[RRA_cdp_xff_val].dv = 0.5; } void rrd_make_default_ds(const char *name, const char *type, gulong pdp_step, struct rrd_ds_def *ds) { g_assert(name != NULL); g_assert(type != NULL); g_assert(rrd_dst_from_string(type) != RRD_DST_INVALID); rspamd_strlcpy(ds->ds_nam, name, sizeof(ds->ds_nam)); rspamd_strlcpy(ds->dst, type, sizeof(ds->dst)); memset(ds->par, 0, sizeof(ds->par)); ds->par[RRD_DS_mrhb_cnt].lv = pdp_step * 2; ds->par[RRD_DS_min_val].dv = NAN; ds->par[RRD_DS_max_val].dv = NAN; } /** * Check rrd file for correctness (size, cookies, etc) */ static gboolean rspamd_rrd_check_file(const char *filename, gboolean need_data, GError **err) { int fd, i; struct stat st; struct rrd_file_head head; struct rrd_rra_def rra; int head_size; fd = open(filename, O_RDWR); if (fd == -1) { g_set_error(err, rrd_error_quark(), errno, "rrd open error: %s", strerror(errno)); return FALSE; } if (fstat(fd, &st) == -1) { g_set_error(err, rrd_error_quark(), errno, "rrd stat error: %s", strerror(errno)); close(fd); return FALSE; } if (st.st_size < (goffset) sizeof(struct rrd_file_head)) { /* We have trimmed file */ g_set_error(err, rrd_error_quark(), EINVAL, "rrd size is bad: %ud", (unsigned int) st.st_size); close(fd); return FALSE; } /* Try to read header */ if (read(fd, &head, sizeof(head)) != sizeof(head)) { g_set_error(err, rrd_error_quark(), errno, "rrd read head error: %s", strerror(errno)); close(fd); return FALSE; } /* Check magic */ if (memcmp(head.version, RRD_VERSION, sizeof(head.version)) != 0) { g_set_error(err, rrd_error_quark(), EINVAL, "rrd head error: bad cookie"); close(fd); return FALSE; } if (head.float_cookie != RRD_FLOAT_COOKIE) { g_set_error(err, rrd_error_quark(), EINVAL, "rrd head error: another architecture " "(file cookie %g != our cookie %g)", head.float_cookie, RRD_FLOAT_COOKIE); close(fd); return FALSE; } /* Check for other params */ if (head.ds_cnt <= 0 || head.rra_cnt <= 0) { g_set_error(err, rrd_error_quark(), EINVAL, "rrd head cookies error: bad rra or ds count"); close(fd); return FALSE; } /* Now we can calculate the overall size of rrd */ head_size = sizeof(struct rrd_file_head) + sizeof(struct rrd_ds_def) * head.ds_cnt + sizeof(struct rrd_rra_def) * head.rra_cnt + sizeof(struct rrd_live_head) + sizeof(struct rrd_pdp_prep) * head.ds_cnt + sizeof(struct rrd_cdp_prep) * head.ds_cnt * head.rra_cnt + sizeof(struct rrd_rra_ptr) * head.rra_cnt; if (st.st_size < (goffset) head_size) { g_set_error(err, rrd_error_quark(), errno, "rrd file seems to have stripped header: %d", head_size); close(fd); return FALSE; } if (need_data) { /* Now check rra */ if (lseek(fd, sizeof(struct rrd_ds_def) * head.ds_cnt, SEEK_CUR) == -1) { g_set_error(err, rrd_error_quark(), errno, "rrd head lseek error: %s", strerror(errno)); close(fd); return FALSE; } for (i = 0; i < (int) head.rra_cnt; i++) { if (read(fd, &rra, sizeof(rra)) != sizeof(rra)) { g_set_error(err, rrd_error_quark(), errno, "rrd read rra error: %s", strerror(errno)); close(fd); return FALSE; } head_size += rra.row_cnt * head.ds_cnt * sizeof(double); } if (st.st_size != head_size) { g_set_error(err, rrd_error_quark(), EINVAL, "rrd file seems to have incorrect size: %d, must be %d", (int) st.st_size, head_size); close(fd); return FALSE; } } close(fd); return TRUE; } /** * Adjust pointers in mmapped rrd file * @param file */ static void rspamd_rrd_adjust_pointers(struct rspamd_rrd_file *file, gboolean completed) { uint8_t *ptr; ptr = file->map; file->stat_head = (struct rrd_file_head *) ptr; ptr += sizeof(struct rrd_file_head); file->ds_def = (struct rrd_ds_def *) ptr; ptr += sizeof(struct rrd_ds_def) * file->stat_head->ds_cnt; file->rra_def = (struct rrd_rra_def *) ptr; ptr += sizeof(struct rrd_rra_def) * file->stat_head->rra_cnt; file->live_head = (struct rrd_live_head *) ptr; ptr += sizeof(struct rrd_live_head); file->pdp_prep = (struct rrd_pdp_prep *) ptr; ptr += sizeof(struct rrd_pdp_prep) * file->stat_head->ds_cnt; file->cdp_prep = (struct rrd_cdp_prep *) ptr; ptr += sizeof(struct rrd_cdp_prep) * file->stat_head->rra_cnt * file->stat_head->ds_cnt; file->rra_ptr = (struct rrd_rra_ptr *) ptr; if (completed) { ptr += sizeof(struct rrd_rra_ptr) * file->stat_head->rra_cnt; file->rrd_value = (double *) ptr; } else { file->rrd_value = NULL; } } static void rspamd_rrd_calculate_checksum(struct rspamd_rrd_file *file) { unsigned char sigbuf[rspamd_cryptobox_HASHBYTES]; struct rrd_ds_def *ds; unsigned int i; rspamd_cryptobox_hash_state_t st; if (file->finalized) { rspamd_cryptobox_hash_init(&st, NULL, 0); rspamd_cryptobox_hash_update(&st, file->filename, strlen(file->filename)); for (i = 0; i < file->stat_head->ds_cnt; i++) { ds = &file->ds_def[i]; rspamd_cryptobox_hash_update(&st, ds->ds_nam, sizeof(ds->ds_nam)); } rspamd_cryptobox_hash_final(&st, sigbuf); file->id = rspamd_encode_base32(sigbuf, sizeof(sigbuf), RSPAMD_BASE32_DEFAULT); } } static int rspamd_rrd_open_exclusive(const char *filename) { struct timespec sleep_ts = { .tv_sec = 0, .tv_nsec = 1000000}; int fd; fd = open(filename, O_RDWR); if (fd == -1) { return -1; } for (;;) { if (rspamd_file_lock(fd, TRUE) == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { nanosleep(&sleep_ts, NULL); continue; } else { close(fd); return -1; } } else { break; } } return fd; }; /** * Open completed or incompleted rrd file * @param filename * @param completed * @param err * @return */ static struct rspamd_rrd_file * rspamd_rrd_open_common(const char *filename, gboolean completed, GError **err) { struct rspamd_rrd_file *file; int fd; struct stat st; if (!rspamd_rrd_check_file(filename, completed, err)) { return NULL; } file = g_malloc0(sizeof(struct rspamd_rrd_file)); /* Open file */ fd = rspamd_rrd_open_exclusive(filename); if (fd == -1) { g_set_error(err, rrd_error_quark(), errno, "rrd open error: %s", strerror(errno)); g_free(file); return FALSE; } if (fstat(fd, &st) == -1) { g_set_error(err, rrd_error_quark(), errno, "rrd stat error: %s", strerror(errno)); rspamd_file_unlock(fd, FALSE); g_free(file); close(fd); return FALSE; } /* Mmap file */ file->size = st.st_size; if ((file->map = mmap(NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) { rspamd_file_unlock(fd, FALSE); close(fd); g_set_error(err, rrd_error_quark(), ENOMEM, "mmap failed: %s", strerror(errno)); g_free(file); return NULL; } file->fd = fd; /* Adjust pointers */ rspamd_rrd_adjust_pointers(file, completed); /* Mark it as finalized */ file->finalized = completed; file->filename = g_strdup(filename); rspamd_rrd_calculate_checksum(file); return file; } /** * Open (and mmap) existing RRD file * @param filename path * @param err error pointer * @return rrd file structure */ struct rspamd_rrd_file * rspamd_rrd_open(const char *filename, GError **err) { struct rspamd_rrd_file *file; if ((file = rspamd_rrd_open_common(filename, TRUE, err))) { msg_info_rrd("rrd file opened: %s", filename); } return file; } /** * Create basic header for rrd file * @param filename file path * @param ds_count number of data sources * @param rra_count number of round robin archives * @param pdp_step step of primary data points * @param err error pointer * @return TRUE if file has been created */ struct rspamd_rrd_file * rspamd_rrd_create(const char *filename, gulong ds_count, gulong rra_count, gulong pdp_step, double initial_ticks, GError **err) { struct rspamd_rrd_file *new; struct rrd_file_head head; struct rrd_ds_def ds; struct rrd_rra_def rra; struct rrd_live_head lh; struct rrd_pdp_prep pdp; struct rrd_cdp_prep cdp; struct rrd_rra_ptr rra_ptr; int fd; unsigned int i, j; /* Open file */ fd = open(filename, O_RDWR | O_CREAT | O_EXCL, 0644); if (fd == -1) { g_set_error(err, rrd_error_quark(), errno, "rrd create error: %s", strerror(errno)); return NULL; } rspamd_file_lock(fd, FALSE); /* Fill header */ memset(&head, 0, sizeof(head)); head.rra_cnt = rra_count; head.ds_cnt = ds_count; head.pdp_step = pdp_step; memcpy(head.cookie, RRD_COOKIE, sizeof(head.cookie)); memcpy(head.version, RRD_VERSION, sizeof(head.version)); head.float_cookie = RRD_FLOAT_COOKIE; if (write(fd, &head, sizeof(head)) != sizeof(head)) { rspamd_file_unlock(fd, FALSE); close(fd); g_set_error(err, rrd_error_quark(), errno, "rrd write error: %s", strerror(errno)); return NULL; } /* Fill DS section */ memset(&ds, 0, sizeof(ds)); memset(&ds.ds_nam, 0, sizeof(ds.ds_nam)); memcpy(&ds.dst, "COUNTER", sizeof("COUNTER")); memset(&ds.par, 0, sizeof(ds.par)); for (i = 0; i < ds_count; i++) { if (write(fd, &ds, sizeof(ds)) != sizeof(ds)) { rspamd_file_unlock(fd, FALSE); close(fd); g_set_error(err, rrd_error_quark(), errno, "rrd write error: %s", strerror(errno)); return NULL; } } /* Fill RRA section */ memset(&rra, 0, sizeof(rra)); memcpy(&rra.cf_nam, "AVERAGE", sizeof("AVERAGE")); rra.pdp_cnt = 1; memset(&rra.par, 0, sizeof(rra.par)); for (i = 0; i < rra_count; i++) { if (write(fd, &rra, sizeof(rra)) != sizeof(rra)) { rspamd_file_unlock(fd, FALSE); close(fd); g_set_error(err, rrd_error_quark(), errno, "rrd write error: %s", strerror(errno)); return NULL; } } /* Fill live header */ memset(&lh, 0, sizeof(lh)); lh.last_up = (glong) initial_ticks; lh.last_up_usec = (glong) ((initial_ticks - lh.last_up) * 1e6f); if (write(fd, &lh, sizeof(lh)) != sizeof(lh)) { rspamd_file_unlock(fd, FALSE); close(fd); g_set_error(err, rrd_error_quark(), errno, "rrd write error: %s", strerror(errno)); return NULL; } /* Fill pdp prep */ memset(&pdp, 0, sizeof(pdp)); memcpy(&pdp.last_ds, "U", sizeof("U")); memset(&pdp.scratch, 0, sizeof(pdp.scratch)); pdp.scratch[PDP_val].dv = NAN; pdp.scratch[PDP_unkn_sec_cnt].lv = 0; for (i = 0; i < ds_count; i++) { if (write(fd, &pdp, sizeof(pdp)) != sizeof(pdp)) { rspamd_file_unlock(fd, FALSE); close(fd); g_set_error(err, rrd_error_quark(), errno, "rrd write error: %s", strerror(errno)); return NULL; } } /* Fill cdp prep */ memset(&cdp, 0, sizeof(cdp)); memset(&cdp.scratch, 0, sizeof(cdp.scratch)); cdp.scratch[CDP_val].dv = NAN; cdp.scratch[CDP_unkn_pdp_cnt].lv = 0; for (i = 0; i < rra_count; i++) { for (j = 0; j < ds_count; j++) { if (write(fd, &cdp, sizeof(cdp)) != sizeof(cdp)) { rspamd_file_unlock(fd, FALSE); close(fd); g_set_error(err, rrd_error_quark(), errno, "rrd write error: %s", strerror(errno)); return NULL; } } } /* Set row pointers */ memset(&rra_ptr, 0, sizeof(rra_ptr)); for (i = 0; i < rra_count; i++) { if (write(fd, &rra_ptr, sizeof(rra_ptr)) != sizeof(rra_ptr)) { rspamd_file_unlock(fd, FALSE); close(fd); g_set_error(err, rrd_error_quark(), errno, "rrd write error: %s", strerror(errno)); return NULL; } } rspamd_file_unlock(fd, FALSE); close(fd); new = rspamd_rrd_open_common(filename, FALSE, err); return new; } /** * Add data sources to rrd file * @param filename path to file * @param ds array of struct rrd_ds_def * @param err error pointer * @return TRUE if data sources were added */ gboolean rspamd_rrd_add_ds(struct rspamd_rrd_file *file, GArray *ds, GError **err) { if (file == NULL || file->stat_head->ds_cnt * sizeof(struct rrd_ds_def) != ds->len) { g_set_error(err, rrd_error_quark(), EINVAL, "rrd add ds failed: wrong arguments"); return FALSE; } /* Straightforward memcpy */ memcpy(file->ds_def, ds->data, ds->len); return TRUE; } /** * Add round robin archives to rrd file * @param filename path to file * @param ds array of struct rrd_rra_def * @param err error pointer * @return TRUE if archives were added */ gboolean rspamd_rrd_add_rra(struct rspamd_rrd_file *file, GArray *rra, GError **err) { if (file == NULL || file->stat_head->rra_cnt * sizeof(struct rrd_rra_def) != rra->len) { g_set_error(err, rrd_error_quark(), EINVAL, "rrd add rra failed: wrong arguments"); return FALSE; } /* Straightforward memcpy */ memcpy(file->rra_def, rra->data, rra->len); return TRUE; } /** * Finalize rrd file header and initialize all RRA in the file * @param filename file path * @param err error pointer * @return TRUE if rrd file is ready for use */ gboolean rspamd_rrd_finalize(struct rspamd_rrd_file *file, GError **err) { int fd; unsigned int i; int count = 0; double vbuf[1024]; struct stat st; if (file == NULL || file->filename == NULL || file->fd == -1) { g_set_error(err, rrd_error_quark(), EINVAL, "rrd add rra failed: wrong arguments"); return FALSE; } fd = file->fd; if (lseek(fd, 0, SEEK_END) == -1) { g_set_error(err, rrd_error_quark(), errno, "rrd seek error: %s", strerror(errno)); close(fd); return FALSE; } /* Adjust CDP */ for (i = 0; i < file->stat_head->rra_cnt; i++) { file->cdp_prep->scratch[CDP_unkn_pdp_cnt].lv = 0; /* Randomize row pointer (disabled) */ /* file->rra_ptr->cur_row = g_random_int () % file->rra_def[i].row_cnt; */ file->rra_ptr->cur_row = file->rra_def[i].row_cnt - 1; /* Calculate values count */ count += file->rra_def[i].row_cnt * file->stat_head->ds_cnt; } munmap(file->map, file->size); /* Write values */ for (i = 0; i < G_N_ELEMENTS(vbuf); i++) { vbuf[i] = NAN; } while (count > 0) { /* Write values in buffered matter */ if (write(fd, vbuf, MIN((int) G_N_ELEMENTS(vbuf), count) * sizeof(double)) == -1) { g_set_error(err, rrd_error_quark(), errno, "rrd write error: %s", strerror(errno)); close(fd); return FALSE; } count -= G_N_ELEMENTS(vbuf); } if (fstat(fd, &st) == -1) { g_set_error(err, rrd_error_quark(), errno, "rrd stat error: %s", strerror(errno)); close(fd); return FALSE; } /* Mmap again */ file->size = st.st_size; if ((file->map = mmap(NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) { close(fd); g_set_error(err, rrd_error_quark(), ENOMEM, "mmap failed: %s", strerror(errno)); return FALSE; } /* Adjust pointers */ rspamd_rrd_adjust_pointers(file, TRUE); file->finalized = TRUE; rspamd_rrd_calculate_checksum(file); msg_info_rrd("rrd file created: %s", file->filename); return TRUE; } /** * Update pdp_prep data * @param file rrd file * @param vals new values * @param pdp_new new pdp array * @param interval time elapsed from the last update * @return */ static gboolean rspamd_rrd_update_pdp_prep(struct rspamd_rrd_file *file, double *vals, double *pdp_new, double interval) { unsigned int i; enum rrd_dst_type type; for (i = 0; i < file->stat_head->ds_cnt; i++) { type = rrd_dst_from_string(file->ds_def[i].dst); if (file->ds_def[i].par[RRD_DS_mrhb_cnt].lv < interval) { rspamd_strlcpy(file->pdp_prep[i].last_ds, "U", sizeof(file->pdp_prep[i].last_ds)); pdp_new[i] = NAN; msg_debug_rrd("adding unknown point interval %.3f is less than heartbeat %l", interval, file->ds_def[i].par[RRD_DS_mrhb_cnt].lv); } else { switch (type) { case RRD_DST_COUNTER: case RRD_DST_DERIVE: if (file->pdp_prep[i].last_ds[0] == 'U') { pdp_new[i] = NAN; msg_debug_rrd("last point is NaN for point %ud", i); } else { pdp_new[i] = vals[i] - strtod(file->pdp_prep[i].last_ds, NULL); msg_debug_rrd("new PDP %ud, %.3f", i, pdp_new[i]); } break; case RRD_DST_GAUGE: pdp_new[i] = vals[i] * interval; msg_debug_rrd("new PDP %ud, %.3f", i, pdp_new[i]); break; case RRD_DST_ABSOLUTE: pdp_new[i] = vals[i]; msg_debug_rrd("new PDP %ud, %.3f", i, pdp_new[i]); break; default: return FALSE; } } /* Copy value to the last_ds */ if (!isnan(vals[i])) { rspamd_snprintf(file->pdp_prep[i].last_ds, sizeof(file->pdp_prep[i].last_ds), "%.4f", vals[i]); } else { file->pdp_prep[i].last_ds[0] = 'U'; file->pdp_prep[i].last_ds[1] = '\0'; } } return TRUE; } /** * Update step for this pdp * @param file * @param pdp_new new pdp array * @param pdp_temp temp pdp array * @param interval time till last update * @param pre_int pre interval * @param post_int post intervall * @param pdp_diff time till last pdp update */ static void rspamd_rrd_update_pdp_step(struct rspamd_rrd_file *file, double *pdp_new, double *pdp_temp, double interval, gulong pdp_diff) { unsigned int i; rrd_value_t *scratch; gulong heartbeat; for (i = 0; i < file->stat_head->ds_cnt; i++) { scratch = file->pdp_prep[i].scratch; heartbeat = file->ds_def[i].par[RRD_DS_mrhb_cnt].lv; if (!isnan(pdp_new[i])) { if (isnan(scratch[PDP_val].dv)) { scratch[PDP_val].dv = 0; } } /* Check interval value for heartbeat for this DS */ if ((interval > heartbeat) || (file->stat_head->pdp_step / 2.0 < scratch[PDP_unkn_sec_cnt].lv)) { pdp_temp[i] = NAN; } else { pdp_temp[i] = scratch[PDP_val].dv / ((double) (pdp_diff - scratch[PDP_unkn_sec_cnt].lv)); } if (isnan(pdp_new[i])) { scratch[PDP_unkn_sec_cnt].lv = interval; scratch[PDP_val].dv = NAN; } else { scratch[PDP_unkn_sec_cnt].lv = 0; scratch[PDP_val].dv = pdp_new[i] / interval; } msg_debug_rrd("new temp PDP %ud, %.3f -> %.3f, scratch: %3f", i, pdp_new[i], pdp_temp[i], scratch[PDP_val].dv); } } /** * Update CDP for this rra * @param file rrd file * @param pdp_steps how much pdp steps elapsed from the last update * @param pdp_offset offset from pdp * @param rra_steps how much steps must be updated for this rra * @param rra_index index of desired rra * @param pdp_temp temporary pdp points */ static void rspamd_rrd_update_cdp(struct rspamd_rrd_file *file, double pdp_steps, double pdp_offset, gulong *rra_steps, gulong rra_index, double *pdp_temp) { unsigned int i; struct rrd_rra_def *rra; rrd_value_t *scratch; enum rrd_cf_type cf; double last_cdp = INFINITY, cur_cdp = INFINITY; gulong pdp_in_cdp; rra = &file->rra_def[rra_index]; cf = rrd_cf_from_string(rra->cf_nam); /* Iterate over all DS for this RRA */ for (i = 0; i < file->stat_head->ds_cnt; i++) { /* Get CDP for this RRA and DS */ scratch = file->cdp_prep[rra_index * file->stat_head->ds_cnt + i].scratch; if (rra->pdp_cnt > 1) { /* Do we have any CDP to update for this rra ? */ if (rra_steps[rra_index] > 0) { if (isnan(pdp_temp[i])) { /* New pdp is nan */ /* Increment unknown points count */ scratch[CDP_unkn_pdp_cnt].lv += pdp_offset; /* Reset secondary value */ scratch[CDP_secondary_val].dv = NAN; } else { scratch[CDP_secondary_val].dv = pdp_temp[i]; } /* Check XFF for this rra */ if (scratch[CDP_unkn_pdp_cnt].lv > rra->pdp_cnt * rra->par[RRA_cdp_xff_val].lv) { /* XFF is reached */ scratch[CDP_primary_val].dv = NAN; } else { /* Need to initialize CDP using specified consolidation */ switch (cf) { case RRD_CF_AVERAGE: last_cdp = isnan(scratch[CDP_val].dv) ? 0.0 : scratch[CDP_val].dv; cur_cdp = isnan(pdp_temp[i]) ? 0.0 : pdp_temp[i]; scratch[CDP_primary_val].dv = (last_cdp + cur_cdp * pdp_offset) / (rra->pdp_cnt - scratch[CDP_unkn_pdp_cnt].lv); break; case RRD_CF_MAXIMUM: last_cdp = isnan(scratch[CDP_val].dv) ? -INFINITY : scratch[CDP_val].dv; cur_cdp = isnan(pdp_temp[i]) ? -INFINITY : pdp_temp[i]; scratch[CDP_primary_val].dv = MAX(last_cdp, cur_cdp); break; case RRD_CF_MINIMUM: last_cdp = isnan(scratch[CDP_val].dv) ? INFINITY : scratch[CDP_val].dv; cur_cdp = isnan(pdp_temp[i]) ? INFINITY : pdp_temp[i]; scratch[CDP_primary_val].dv = MIN(last_cdp, cur_cdp); break; case RRD_CF_LAST: default: scratch[CDP_primary_val].dv = pdp_temp[i]; last_cdp = INFINITY; break; } } /* Init carry of this CDP */ pdp_in_cdp = (pdp_steps - pdp_offset) / rra->pdp_cnt; if (pdp_in_cdp == 0 || isnan(pdp_temp[i])) { /* Set overflow */ switch (cf) { case RRD_CF_AVERAGE: scratch[CDP_val].dv = 0; break; case RRD_CF_MAXIMUM: scratch[CDP_val].dv = -INFINITY; break; case RRD_CF_MINIMUM: scratch[CDP_val].dv = INFINITY; break; default: scratch[CDP_val].dv = NAN; break; } } else { /* Special carry for average */ if (cf == RRD_CF_AVERAGE) { scratch[CDP_val].dv = pdp_temp[i] * pdp_in_cdp; } else { scratch[CDP_val].dv = pdp_temp[i]; } } scratch[CDP_unkn_pdp_cnt].lv = 0; msg_debug_rrd("update cdp for DS %d with value %.3f, " "stored value: %.3f, carry: %.3f", i, last_cdp, scratch[CDP_primary_val].dv, scratch[CDP_val].dv); } /* In this case we just need to update cdp_prep for this RRA */ else { if (isnan(pdp_temp[i])) { /* Just increase undefined zone */ scratch[CDP_unkn_pdp_cnt].lv += pdp_steps; } else { /* Calculate cdp value */ last_cdp = scratch[CDP_val].dv; switch (cf) { case RRD_CF_AVERAGE: if (isnan(last_cdp)) { scratch[CDP_val].dv = pdp_temp[i] * pdp_steps; } else { scratch[CDP_val].dv = last_cdp + pdp_temp[i] * pdp_steps; } break; case RRD_CF_MAXIMUM: scratch[CDP_val].dv = MAX(last_cdp, pdp_temp[i]); break; case RRD_CF_MINIMUM: scratch[CDP_val].dv = MIN(last_cdp, pdp_temp[i]); break; case RRD_CF_LAST: scratch[CDP_val].dv = pdp_temp[i]; break; default: scratch[CDP_val].dv = NAN; break; } } msg_debug_rrd("aggregate cdp %d with pdp %.3f, " "stored value: %.3f", i, pdp_temp[i], scratch[CDP_val].dv); } } else { /* We have nothing to consolidate, but we may miss some pdp */ if (pdp_steps > 2) { /* Just write PDP value */ scratch[CDP_primary_val].dv = pdp_temp[i]; scratch[CDP_secondary_val].dv = pdp_temp[i]; } } } } /** * Update RRA in a file * @param file rrd file * @param rra_steps steps for each rra * @param now current time */ void rspamd_rrd_write_rra(struct rspamd_rrd_file *file, gulong *rra_steps) { unsigned int i, j, ds_cnt; struct rrd_rra_def *rra; struct rrd_cdp_prep *cdp; double *rra_row = file->rrd_value, *cur_row; ds_cnt = file->stat_head->ds_cnt; /* Iterate over all RRA */ for (i = 0; i < file->stat_head->rra_cnt; i++) { rra = &file->rra_def[i]; if (rra_steps[i] > 0) { /* Move row ptr */ if (++file->rra_ptr[i].cur_row >= rra->row_cnt) { file->rra_ptr[i].cur_row = 0; } /* Calculate seek */ cdp = &file->cdp_prep[ds_cnt * i]; cur_row = rra_row + ds_cnt * file->rra_ptr[i].cur_row; /* Iterate over DS */ for (j = 0; j < ds_cnt; j++) { cur_row[j] = cdp[j].scratch[CDP_primary_val].dv; msg_debug_rrd("write cdp %d: %.3f", j, cur_row[j]); } } rra_row += rra->row_cnt * ds_cnt; } } /** * Add record to rrd file * @param file rrd file object * @param points points (must be row suitable for this RRA, depending on ds count) * @param err error pointer * @return TRUE if a row has been added */ gboolean rspamd_rrd_add_record(struct rspamd_rrd_file *file, GArray *points, double ticks, GError **err) { double interval, *pdp_new, *pdp_temp; unsigned int i; glong seconds, microseconds; gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step, prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset; if (file == NULL || file->stat_head->ds_cnt * sizeof(double) != points->len) { g_set_error(err, rrd_error_quark(), EINVAL, "rrd add points failed: wrong arguments"); return FALSE; } /* Get interval */ seconds = (glong) ticks; microseconds = (glong) ((ticks - seconds) * 1000000.); interval = ticks - ((double) file->live_head->last_up + file->live_head->last_up_usec / 1000000.); msg_debug_rrd("update rrd record after %.3f seconds", interval); /* Update PDP preparation values */ pdp_new = g_malloc0(sizeof(double) * file->stat_head->ds_cnt); pdp_temp = g_malloc0(sizeof(double) * file->stat_head->ds_cnt); /* How much steps need to be updated in each RRA */ rra_steps = g_malloc0(sizeof(gulong) * file->stat_head->rra_cnt); if (!rspamd_rrd_update_pdp_prep(file, (double *) points->data, pdp_new, interval)) { g_set_error(err, rrd_error_quark(), EINVAL, "rrd update pdp failed: wrong arguments"); g_free(pdp_new); g_free(pdp_temp); g_free(rra_steps); return FALSE; } /* Calculate elapsed steps */ /* Age in seconds for previous pdp store */ prev_pdp_age = file->live_head->last_up % file->stat_head->pdp_step; /* Time in seconds for last pdp update */ prev_pdp_step = file->live_head->last_up - prev_pdp_age; /* Age in seconds from current time to required pdp time */ cur_pdp_age = seconds % file->stat_head->pdp_step; /* Time of desired pdp step */ cur_pdp_step = seconds - cur_pdp_age; cur_pdp_count = cur_pdp_step / file->stat_head->pdp_step; pdp_steps = (cur_pdp_step - prev_pdp_step) / file->stat_head->pdp_step; if (pdp_steps == 0) { /* Simple update of pdp prep */ for (i = 0; i < file->stat_head->ds_cnt; i++) { if (isnan(pdp_new[i])) { /* Increment unknown period */ file->pdp_prep[i].scratch[PDP_unkn_sec_cnt].lv += floor( interval); } else { if (isnan(file->pdp_prep[i].scratch[PDP_val].dv)) { /* Reset pdp to the current value */ file->pdp_prep[i].scratch[PDP_val].dv = pdp_new[i]; } else { /* Increment pdp value */ file->pdp_prep[i].scratch[PDP_val].dv += pdp_new[i]; } } } } else { /* Complex update of PDP, CDP and RRA */ /* Update PDP for this step */ rspamd_rrd_update_pdp_step(file, pdp_new, pdp_temp, interval, pdp_steps * file->stat_head->pdp_step); /* Update CDP points for each RRA*/ for (i = 0; i < file->stat_head->rra_cnt; i++) { /* Calculate pdp offset for this RRA */ pdp_offset = file->rra_def[i].pdp_cnt - cur_pdp_count % file->rra_def[i].pdp_cnt; /* How much steps we got for this RRA */ if (pdp_offset <= pdp_steps) { rra_steps[i] = (pdp_steps - pdp_offset) / file->rra_def[i].pdp_cnt + 1; } else { /* This rra have not passed enough pdp steps */ rra_steps[i] = 0; } msg_debug_rrd("cdp: %ud, rra steps: %ul(%ul), pdp steps: %ul", i, rra_steps[i], pdp_offset, pdp_steps); /* Update this specific CDP */ rspamd_rrd_update_cdp(file, pdp_steps, pdp_offset, rra_steps, i, pdp_temp); } /* Write RRA */ rspamd_rrd_write_rra(file, rra_steps); } file->live_head->last_up = seconds; file->live_head->last_up_usec = microseconds; /* Sync and invalidate */ msync(file->map, file->size, MS_ASYNC | MS_INVALIDATE); g_free(pdp_new); g_free(pdp_temp); g_free(rra_steps); return TRUE; } /** * Close rrd file * @param file * @return */ int rspamd_rrd_close(struct rspamd_rrd_file *file) { if (file == NULL) { errno = EINVAL; return -1; } munmap(file->map, file->size); close(file->fd); g_free(file->filename); g_free(file->id); g_free(file); return 0; } static struct rspamd_rrd_file * rspamd_rrd_create_file(const char *path, gboolean finalize, GError **err) { struct rspamd_rrd_file *file; struct rrd_ds_def ds[RSPAMD_RRD_DS_COUNT]; struct rrd_rra_def rra[RSPAMD_RRD_RRA_COUNT]; int i; GArray ar; /* Try to create new rrd file */ file = rspamd_rrd_create(path, RSPAMD_RRD_DS_COUNT, RSPAMD_RRD_RRA_COUNT, 1, rspamd_get_calendar_ticks(), err); if (file == NULL) { return NULL; } /* Create DS and RRA */ for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i++) { rrd_make_default_ds(rspamd_action_to_str(i), rrd_dst_to_string(RRD_DST_COUNTER), 1, &ds[i]); } ar.data = (char *) ds; ar.len = sizeof(ds); if (!rspamd_rrd_add_ds(file, &ar, err)) { rspamd_rrd_close(file); return NULL; } /* Once per minute for 1 day */ rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE), 60, 24 * 60, &rra[0]); /* Once per 5 minutes for 1 week */ rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE), 5 * 60, 7 * 24 * 60 / 5, &rra[1]); /* Once per 10 mins for 1 month */ rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE), 60 * 10, 30 * 24 * 6, &rra[2]); /* Once per hour for 1 year */ rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE), 60 * 60, 365 * 24, &rra[3]); ar.data = (char *) rra; ar.len = sizeof(rra); if (!rspamd_rrd_add_rra(file, &ar, err)) { rspamd_rrd_close(file); return NULL; } if (finalize && !rspamd_rrd_finalize(file, err)) { rspamd_rrd_close(file); return NULL; } return file; } static void rspamd_rrd_convert_ds(struct rspamd_rrd_file *old, struct rspamd_rrd_file *cur, int idx_old, int idx_new) { struct rrd_pdp_prep *pdp_prep_old, *pdp_prep_new; struct rrd_cdp_prep *cdp_prep_old, *cdp_prep_new; double *val_old, *val_new; gulong rra_cnt, i, j, points_cnt, old_ds, new_ds; rra_cnt = old->stat_head->rra_cnt; pdp_prep_old = &old->pdp_prep[idx_old]; pdp_prep_new = &cur->pdp_prep[idx_new]; memcpy(pdp_prep_new, pdp_prep_old, sizeof(*pdp_prep_new)); val_old = old->rrd_value; val_new = cur->rrd_value; old_ds = old->stat_head->ds_cnt; new_ds = cur->stat_head->ds_cnt; for (i = 0; i < rra_cnt; i++) { cdp_prep_old = &old->cdp_prep[i * old_ds] + idx_old; cdp_prep_new = &cur->cdp_prep[i * new_ds] + idx_new; memcpy(cdp_prep_new, cdp_prep_old, sizeof(*cdp_prep_new)); points_cnt = old->rra_def[i].row_cnt; for (j = 0; j < points_cnt; j++) { val_new[j * new_ds + idx_new] = val_old[j * old_ds + idx_old]; } val_new += points_cnt * new_ds; val_old += points_cnt * old_ds; } } static struct rspamd_rrd_file * rspamd_rrd_convert(const char *path, struct rspamd_rrd_file *old, GError **err) { struct rspamd_rrd_file *rrd; char tpath[PATH_MAX]; g_assert(old != NULL); rspamd_snprintf(tpath, sizeof(tpath), "%s.new", path); rrd = rspamd_rrd_create_file(tpath, TRUE, err); if (rrd) { /* Copy old data */ memcpy(rrd->live_head, old->live_head, sizeof(*rrd->live_head)); memcpy(rrd->rra_ptr, old->rra_ptr, sizeof(*old->rra_ptr) * rrd->stat_head->rra_cnt); /* * Old DSes: * 0 - spam -> reject * 1 - probable spam -> add header * 2 - greylist -> greylist * 3 - ham -> ham */ rspamd_rrd_convert_ds(old, rrd, 0, METRIC_ACTION_REJECT); rspamd_rrd_convert_ds(old, rrd, 1, METRIC_ACTION_ADD_HEADER); rspamd_rrd_convert_ds(old, rrd, 2, METRIC_ACTION_GREYLIST); rspamd_rrd_convert_ds(old, rrd, 3, METRIC_ACTION_NOACTION); if (unlink(path) == -1) { g_set_error(err, rrd_error_quark(), errno, "cannot unlink old rrd file %s: %s", path, strerror(errno)); unlink(tpath); rspamd_rrd_close(rrd); return NULL; } if (rename(tpath, path) == -1) { g_set_error(err, rrd_error_quark(), errno, "cannot rename old rrd file %s: %s", path, strerror(errno)); unlink(tpath); rspamd_rrd_close(rrd); return NULL; } } return rrd; } struct rspamd_rrd_file * rspamd_rrd_file_default(const char *path, GError **err) { struct rspamd_rrd_file *file, *nf; g_assert(path != NULL); if (access(path, R_OK) != -1) { /* We can open rrd file */ file = rspamd_rrd_open(path, err); if (file == NULL) { return NULL; } if (file->stat_head->rra_cnt != RSPAMD_RRD_RRA_COUNT) { msg_err_rrd("rrd file is not suitable for rspamd: it has " "%ul ds and %ul rra", file->stat_head->ds_cnt, file->stat_head->rra_cnt); g_set_error(err, rrd_error_quark(), EINVAL, "bad rrd file"); rspamd_rrd_close(file); return NULL; } else if (file->stat_head->ds_cnt == RSPAMD_RRD_OLD_DS_COUNT) { /* Old rrd, need to convert */ msg_info_rrd("rrd file %s is not suitable for rspamd, convert it", path); nf = rspamd_rrd_convert(path, file, err); rspamd_rrd_close(file); return nf; } else if (file->stat_head->ds_cnt == RSPAMD_RRD_DS_COUNT) { return file; } else { msg_err_rrd("rrd file is not suitable for rspamd: it has " "%ul ds and %ul rra", file->stat_head->ds_cnt, file->stat_head->rra_cnt); g_set_error(err, rrd_error_quark(), EINVAL, "bad rrd file"); rspamd_rrd_close(file); return NULL; } } file = rspamd_rrd_create_file(path, TRUE, err); return file; } struct rspamd_rrd_query_result * rspamd_rrd_query(struct rspamd_rrd_file *file, gulong rra_num) { struct rspamd_rrd_query_result *res; struct rrd_rra_def *rra; const double *rra_offset = NULL; unsigned int i; g_assert(file != NULL); if (rra_num > file->stat_head->rra_cnt) { msg_err_rrd("requested unexisting rra: %l", rra_num); return NULL; } res = g_malloc0(sizeof(*res)); res->ds_count = file->stat_head->ds_cnt; res->last_update = (double) file->live_head->last_up + ((double) file->live_head->last_up_usec / 1e6f); res->pdp_per_cdp = file->rra_def[rra_num].pdp_cnt; res->rra_rows = file->rra_def[rra_num].row_cnt; rra_offset = file->rrd_value; for (i = 0; i < file->stat_head->rra_cnt; i++) { rra = &file->rra_def[i]; if (i == rra_num) { res->cur_row = file->rra_ptr[i].cur_row % rra->row_cnt; break; } rra_offset += rra->row_cnt * res->ds_count; } res->data = rra_offset; return res; }