diff options
Diffstat (limited to 'src/libserver/binlog.c')
-rw-r--r-- | src/libserver/binlog.c | 283 |
1 files changed, 196 insertions, 87 deletions
diff --git a/src/libserver/binlog.c b/src/libserver/binlog.c index ec191eac7..ee0bf86bc 100644 --- a/src/libserver/binlog.c +++ b/src/libserver/binlog.c @@ -22,9 +22,9 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#include "config.h" #include "binlog.h" #include "cfg_file.h" +#include "config.h" #include "tokenizers/tokenizers.h" #define BINLOG_SUFFIX ".binlog" @@ -35,7 +35,7 @@ static GHashTable *binlog_opened = NULL; static rspamd_mempool_t *binlog_pool = NULL; -static gboolean +static gboolean binlog_write_header (struct rspamd_binlog *log) { struct rspamd_binlog_header header = { @@ -43,27 +43,35 @@ binlog_write_header (struct rspamd_binlog *log) .version = VALID_VERSION, .padding = { '\0', '\0' }, }; - + header.create_time = time (NULL); lock_file (log->fd, FALSE); - + if (write (log->fd, &header, sizeof (struct rspamd_binlog_header)) == -1) { - msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); + msg_warn ("cannot write file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); return FALSE; } memcpy (&log->header, &header, sizeof (struct rspamd_binlog_header)); - + /* Metaindex */ log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex)); bzero (log->metaindex, sizeof (struct rspamd_binlog_metaindex)); /* Offset to metaindex */ - log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) + sizeof (struct rspamd_binlog_header); + log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) + + sizeof (struct rspamd_binlog_header); - if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex)) == -1) { + if (write (log->fd, log->metaindex, + sizeof (struct rspamd_binlog_metaindex)) == -1) { g_free (log->metaindex); - msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); + msg_warn ("cannot write file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); unlock_file (log->fd, FALSE); return FALSE; } @@ -71,9 +79,13 @@ binlog_write_header (struct rspamd_binlog *log) /* Alloc, write, mmap */ log->cur_idx = g_malloc (sizeof (struct rspamd_index_block)); bzero (log->cur_idx, sizeof (struct rspamd_index_block)); - if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { + if (write (log->fd, log->cur_idx, + sizeof (struct rspamd_index_block)) == -1) { g_free (log->cur_idx); - msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); + msg_warn ("cannot write file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); unlock_file (log->fd, FALSE); return FALSE; } @@ -88,14 +100,20 @@ binlog_check_file (struct rspamd_binlog *log) { static gchar valid_magic[] = VALID_MAGIC, valid_version[] = VALID_VERSION; - if (read (log->fd, &log->header, sizeof (struct rspamd_binlog_header)) != sizeof (struct rspamd_binlog_header)) { - msg_warn ("cannot read file %s, error %d, %s", log->filename, errno, strerror (errno)); + if (read (log->fd, &log->header, + sizeof (struct rspamd_binlog_header)) != + sizeof (struct rspamd_binlog_header)) { + msg_warn ("cannot read file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); return FALSE; } /* Now check all fields */ - if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 || - memcmp (&log->header.version, valid_version, sizeof (valid_version)) != 0) { + if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 || + memcmp (&log->header.version, valid_version, + sizeof (valid_version)) != 0) { msg_warn ("cannot validate file %s"); return FALSE; } @@ -103,24 +121,38 @@ binlog_check_file (struct rspamd_binlog *log) if (log->metaindex == NULL) { log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex)); } - if ((read (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex))) != sizeof (struct rspamd_binlog_metaindex)) { - msg_warn ("cannot read metaindex of file %s, error %d, %s", log->filename, errno, strerror (errno)); + if ((read (log->fd, log->metaindex, + sizeof (struct rspamd_binlog_metaindex))) != + sizeof (struct rspamd_binlog_metaindex)) { + msg_warn ("cannot read metaindex of file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); return FALSE; } /* Current index */ if (log->cur_idx == NULL) { log->cur_idx = g_malloc (sizeof (struct rspamd_index_block)); } - if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], SEEK_SET) == -1) { - msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); + if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], + SEEK_SET) == -1) { + msg_info ("cannot seek in file: %s, error: %s", + log->filename, + strerror (errno)); return FALSE; } - if ((read (log->fd, log->cur_idx, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) { - msg_warn ("cannot read index in file %s, error %d, %s", log->filename, errno, strerror (errno)); + if ((read (log->fd, log->cur_idx, + sizeof (struct rspamd_index_block))) != + sizeof (struct rspamd_index_block)) { + msg_warn ("cannot read index in file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); return FALSE; } - log->cur_seq = log->metaindex->last_index * BINLOG_IDX_LEN + log->cur_idx->last_index; + log->cur_seq = log->metaindex->last_index * BINLOG_IDX_LEN + + log->cur_idx->last_index; log->cur_time = log->cur_idx->indexes[log->cur_idx->last_index].time; return TRUE; @@ -130,8 +162,13 @@ binlog_check_file (struct rspamd_binlog *log) static gboolean binlog_create (struct rspamd_binlog *log) { - if ((log->fd = open (log->filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) { - msg_info ("cannot create file %s, error %d, %s", log->filename, errno, strerror (errno)); + if ((log->fd = + open (log->filename, O_RDWR | O_TRUNC | O_CREAT, + S_IWUSR | S_IRUSR)) == -1) { + msg_info ("cannot create file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); return FALSE; } @@ -142,19 +179,25 @@ static gboolean binlog_open_real (struct rspamd_binlog *log) { if ((log->fd = open (log->filename, O_RDWR)) == -1) { - msg_info ("cannot open file %s, error %d, %s", log->filename, errno, strerror (errno)); + msg_info ("cannot open file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); return FALSE; } - + return binlog_check_file (log); } -struct rspamd_binlog* -binlog_open (rspamd_mempool_t *pool, const gchar *path, time_t rotate_time, gint rotate_jitter) +struct rspamd_binlog * +binlog_open (rspamd_mempool_t *pool, + const gchar *path, + time_t rotate_time, + gint rotate_jitter) { struct rspamd_binlog *new; - gint len = strlen (path); + gint len = strlen (path); struct stat st; new = rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_binlog)); @@ -165,15 +208,16 @@ binlog_open (rspamd_mempool_t *pool, const gchar *path, time_t rotate_time, gint if (rotate_time) { new->rotate_jitter = g_random_int_range (0, rotate_jitter); } - + new->filename = rspamd_mempool_alloc (pool, len + sizeof (BINLOG_SUFFIX)); - rspamd_strlcpy (new->filename, path, len + 1); + rspamd_strlcpy (new->filename, path, len + 1); rspamd_strlcpy (new->filename + len, BINLOG_SUFFIX, sizeof (BINLOG_SUFFIX)); if (stat (new->filename, &st) == -1) { /* Check errno to check whether we should create this file */ if (errno != ENOENT) { - msg_err ("cannot stat file: %s, error %s", new->filename, strerror (errno)); + msg_err ("cannot stat file: %s, error %s", new->filename, + strerror (errno)); return NULL; } else { @@ -193,7 +237,7 @@ binlog_open (rspamd_mempool_t *pool, const gchar *path, time_t rotate_time, gint return new; } -void +void binlog_close (struct rspamd_binlog *log) { if (log) { @@ -210,16 +254,18 @@ binlog_close (struct rspamd_binlog *log) static gboolean binlog_tree_callback (gpointer key, gpointer value, gpointer data) { - token_node_t *node = key; - struct rspamd_binlog *log = data; - struct rspamd_binlog_element elt; + token_node_t *node = key; + struct rspamd_binlog *log = data; + struct rspamd_binlog_element elt; elt.h1 = node->h1; elt.h2 = node->h2; elt.value = node->value; - + if (write (log->fd, &elt, sizeof (elt)) == -1) { - msg_info ("cannot write token to file: %s, error: %s", log->filename, strerror (errno)); + msg_info ("cannot write token to file: %s, error: %s", + log->filename, + strerror (errno)); return TRUE; } @@ -233,12 +279,14 @@ write_binlog_tree (struct rspamd_binlog *log, GTree *nodes) struct rspamd_binlog_index *idx; lock_file (log->fd, FALSE); - log->cur_seq ++; + log->cur_seq++; /* Seek to end of file */ if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { unlock_file (log->fd, FALSE); - msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); + msg_info ("cannot seek in file: %s, error: %s", + log->filename, + strerror (errno)); return FALSE; } @@ -251,21 +299,28 @@ write_binlog_tree (struct rspamd_binlog *log, GTree *nodes) idx->time = (guint64)time (NULL); log->cur_time = idx->time; idx->len = g_tree_nnodes (nodes) * sizeof (struct rspamd_binlog_element); - if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], SEEK_SET) == -1) { + if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], + SEEK_SET) == -1) { unlock_file (log->fd, FALSE); - msg_info ("cannot seek in file: %s, error: %s, seek: %L, op: insert index", log->filename, - strerror (errno), log->metaindex->indexes[log->metaindex->last_index]); + msg_info ( + "cannot seek in file: %s, error: %s, seek: %L, op: insert index", + log->filename, + strerror (errno), + log->metaindex->indexes[log->metaindex->last_index]); return FALSE; } - log->cur_idx->last_index ++; - if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { + log->cur_idx->last_index++; + if (write (log->fd, log->cur_idx, + sizeof (struct rspamd_index_block)) == -1) { unlock_file (log->fd, FALSE); - msg_info ("cannot write index to file: %s, error: %s", log->filename, strerror (errno)); + msg_info ("cannot write index to file: %s, error: %s", + log->filename, + strerror (errno)); return FALSE; } unlock_file (log->fd, FALSE); - + return TRUE; } @@ -275,18 +330,24 @@ create_new_metaindex_block (struct rspamd_binlog *log) off_t seek; lock_file (log->fd, FALSE); - - log->metaindex->last_index ++; + + log->metaindex->last_index++; /* Seek to end of file */ if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { unlock_file (log->fd, FALSE); - msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); + msg_info ("cannot seek in file: %s, error: %s", + log->filename, + strerror (errno)); return FALSE; } - if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { + if (write (log->fd, log->cur_idx, + sizeof (struct rspamd_index_block)) == -1) { unlock_file (log->fd, FALSE); g_free (log->cur_idx); - msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); + msg_warn ("cannot write file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); return FALSE; } /* Offset to metaindex */ @@ -294,12 +355,17 @@ create_new_metaindex_block (struct rspamd_binlog *log) /* Overwrite all metaindexes */ if (lseek (log->fd, sizeof (struct rspamd_binlog_header), SEEK_SET) == -1) { unlock_file (log->fd, FALSE); - msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); + msg_info ("cannot seek in file: %s, error: %s", + log->filename, + strerror (errno)); return FALSE; } - if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex)) == -1) { + if (write (log->fd, log->metaindex, + sizeof (struct rspamd_binlog_metaindex)) == -1) { unlock_file (log->fd, FALSE); - msg_info ("cannot write metaindex in file: %s, error: %s", log->filename, strerror (errno)); + msg_info ("cannot write metaindex in file: %s, error: %s", + log->filename, + strerror (errno)); return FALSE; } bzero (log->cur_idx, sizeof (struct rspamd_index_block)); @@ -311,9 +377,11 @@ create_new_metaindex_block (struct rspamd_binlog *log) static gboolean maybe_rotate_binlog (struct rspamd_binlog *log) { - guint64 now = time (NULL); + guint64 now = time (NULL); - if (log->rotate_time && ((now - log->header.create_time) > (guint)(log->rotate_time + log->rotate_jitter))) { + if (log->rotate_time && + ((now - log->header.create_time) > + (guint)(log->rotate_time + log->rotate_jitter))) { return TRUE; } return FALSE; @@ -322,7 +390,7 @@ maybe_rotate_binlog (struct rspamd_binlog *log) static gboolean rotate_binlog (struct rspamd_binlog *log) { - gchar *backup_name; + gchar *backup_name; struct stat st; lock_file (log->fd, FALSE); @@ -355,13 +423,14 @@ rotate_binlog (struct rspamd_binlog *log) } -gboolean +gboolean binlog_insert (struct rspamd_binlog *log, GTree *nodes) { off_t seek; if (!log || !log->metaindex || !log->cur_idx || !nodes) { - msg_info ("improperly opened binlog: %s", log != NULL ? log->filename : "unknown"); + msg_info ("improperly opened binlog: %s", + log != NULL ? log->filename : "unknown"); return FALSE; } @@ -381,7 +450,9 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes) if (log->metaindex->last_index < METAINDEX_LEN) { /* Create new index block */ if ((seek = lseek (log->fd, 0, SEEK_END)) == (off_t)-1) { - msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); + msg_info ("cannot seek in file: %s, error: %s", + log->filename, + strerror (errno)); return FALSE; } if (!create_new_metaindex_block (log)) { @@ -389,7 +460,7 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes) } return write_binlog_tree (log, nodes); } - + /* All binlog is filled, we need to rotate it forcefully */ if (!rotate_binlog (log)) { return FALSE; @@ -398,16 +469,20 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes) return write_binlog_tree (log, nodes); } -gboolean -binlog_sync (struct rspamd_binlog *log, guint64 from_rev, guint64 *from_time, GByteArray **rep) +gboolean +binlog_sync (struct rspamd_binlog *log, + guint64 from_rev, + guint64 *from_time, + GByteArray **rep) { - guint32 metaindex_num; + guint32 metaindex_num; struct rspamd_index_block *idxb; struct rspamd_binlog_index *idx; gboolean idx_mapped = FALSE, res = TRUE, is_first = FALSE; if (!log || !log->metaindex || !log->cur_idx) { - msg_info ("improperly opened binlog: %s", log != NULL ? log->filename : "unknown"); + msg_info ("improperly opened binlog: %s", + log != NULL ? log->filename : "unknown"); return FALSE; } @@ -419,7 +494,7 @@ binlog_sync (struct rspamd_binlog *log, guint64 from_rev, guint64 *from_time, GB /* Unmap old fragment */ g_free ((*rep)->data); } - + if (from_rev == log->cur_seq) { /* Last record */ *rep = NULL; @@ -427,7 +502,11 @@ binlog_sync (struct rspamd_binlog *log, guint64 from_rev, guint64 *from_time, GB } else if (from_rev > log->cur_seq) { /* Slave has more actual copy, write this to log and abort sync */ - msg_warn ("slave has more recent revision of statfile %s: %uL and our is: %uL", log->filename, from_rev, log->cur_seq); + msg_warn ( + "slave has more recent revision of statfile %s: %uL and our is: %uL", + log->filename, + from_rev, + log->cur_seq); *rep = NULL; *from_time = 0; return FALSE; @@ -443,15 +522,24 @@ binlog_sync (struct rspamd_binlog *log, guint64 from_rev, guint64 *from_time, GB lock_file (log->fd, FALSE); idxb = g_malloc (sizeof (struct rspamd_index_block)); idx_mapped = TRUE; - if (lseek (log->fd, log->metaindex->indexes[metaindex_num], SEEK_SET) == -1) { + if (lseek (log->fd, log->metaindex->indexes[metaindex_num], + SEEK_SET) == -1) { unlock_file (log->fd, FALSE); - msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno)); + msg_warn ("cannot seek file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); res = FALSE; goto end; } - if ((read (log->fd, idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) { + if ((read (log->fd, idxb, + sizeof (struct rspamd_index_block))) != + sizeof (struct rspamd_index_block)) { unlock_file (log->fd, FALSE); - msg_warn ("cannot read index from file %s, error %d, %s", log->filename, errno, strerror (errno)); + msg_warn ("cannot read index from file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); res = FALSE; goto end; } @@ -474,17 +562,27 @@ binlog_sync (struct rspamd_binlog *log, guint64 from_rev, guint64 *from_time, GB /* Now fill reply structure */ (*rep)->len = idx->len; /* Read result */ - msg_info ("update from binlog '%s' from revision: %uL to revision %uL size is %uL", - log->filename, from_rev, log->cur_seq, idx->len); + msg_info ( + "update from binlog '%s' from revision: %uL to revision %uL size is %uL", + log->filename, + from_rev, + log->cur_seq, + idx->len); if (lseek (log->fd, idx->seek, SEEK_SET) == -1) { - msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno)); + msg_warn ("cannot seek file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); res = FALSE; goto end; } - + (*rep)->data = g_malloc (idx->len); if ((read (log->fd, (*rep)->data, idx->len)) != (ssize_t)idx->len) { - msg_warn ("cannot read file %s, error %d, %s", log->filename, errno, strerror (errno)); + msg_warn ("cannot read file %s, error %d, %s", + log->filename, + errno, + strerror (errno)); res = FALSE; goto end; } @@ -517,8 +615,11 @@ maybe_init_static (void) return TRUE; } -gboolean -maybe_write_binlog (struct rspamd_classifier_config *ccf, struct rspamd_statfile_config *st, stat_file_t *file, GTree *nodes) +gboolean +maybe_write_binlog (struct rspamd_classifier_config *ccf, + struct rspamd_statfile_config *st, + stat_file_t *file, + GTree *nodes) { struct rspamd_binlog *log; @@ -526,8 +627,9 @@ maybe_write_binlog (struct rspamd_classifier_config *ccf, struct rspamd_statfile return FALSE; } - - if (st == NULL || nodes == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) { + + if (st == NULL || nodes == NULL || st->binlog == NULL || + st->binlog->affinity != AFFINITY_MASTER) { return FALSE; } @@ -536,7 +638,9 @@ maybe_write_binlog (struct rspamd_classifier_config *ccf, struct rspamd_statfile } if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) { - if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) { + if ((log = + binlog_open (binlog_pool, st->path, st->binlog->rotate_time, + st->binlog->rotate_time / 2)) != NULL) { g_hash_table_insert (binlog_opened, st, log); } else { @@ -545,7 +649,9 @@ maybe_write_binlog (struct rspamd_classifier_config *ccf, struct rspamd_statfile } if (binlog_insert (log, nodes)) { - msg_info ("set new revision of statfile %s: %uL", st->symbol, log->cur_seq); + msg_info ("set new revision of statfile %s: %uL", + st->symbol, + log->cur_seq); (void)statfile_set_revision (file, log->cur_seq, log->cur_time); return TRUE; } @@ -553,12 +659,13 @@ maybe_write_binlog (struct rspamd_classifier_config *ccf, struct rspamd_statfile return FALSE; } -struct rspamd_binlog* +struct rspamd_binlog * get_binlog_by_statfile (struct rspamd_statfile_config *st) { struct rspamd_binlog *log; - if (st == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) { + if (st == NULL || st->binlog == NULL || st->binlog->affinity != + AFFINITY_MASTER) { return NULL; } @@ -567,13 +674,15 @@ get_binlog_by_statfile (struct rspamd_statfile_config *st) } if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) { - if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) { + if ((log = + binlog_open (binlog_pool, st->path, st->binlog->rotate_time, + st->binlog->rotate_time / 2)) != NULL) { g_hash_table_insert (binlog_opened, st, log); } else { return NULL; } } - + return log; } |