diff options
Diffstat (limited to 'src/libserver/binlog.c')
-rw-r--r-- | src/libserver/binlog.c | 283 |
1 files changed, 87 insertions, 196 deletions
diff --git a/src/libserver/binlog.c b/src/libserver/binlog.c index ee0bf86bc..ec191eac7 100644 --- a/src/libserver/binlog.c +++ b/src/libserver/binlog.c @@ -22,9 +22,9 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include "config.h" #include "binlog.h" #include "cfg_file.h" -#include "config.h" #include "tokenizers/tokenizers.h" #define BINLOG_SUFFIX ".binlog" @@ -35,7 +35,7 @@ static GHashTable *binlog_opened = NULL; static rspamd_mempool_t *binlog_pool = NULL; -static gboolean +static gboolean binlog_write_header (struct rspamd_binlog *log) { struct rspamd_binlog_header header = { @@ -43,35 +43,27 @@ binlog_write_header (struct rspamd_binlog *log) .version = VALID_VERSION, .padding = { '\0', '\0' }, }; - + header.create_time = time (NULL); lock_file (log->fd, FALSE); - + if (write (log->fd, &header, sizeof (struct rspamd_binlog_header)) == -1) { - msg_warn ("cannot write file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } memcpy (&log->header, &header, sizeof (struct rspamd_binlog_header)); - + /* Metaindex */ log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex)); bzero (log->metaindex, sizeof (struct rspamd_binlog_metaindex)); /* Offset to metaindex */ - log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) + - sizeof (struct rspamd_binlog_header); + log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) + sizeof (struct rspamd_binlog_header); - if (write (log->fd, log->metaindex, - sizeof (struct rspamd_binlog_metaindex)) == -1) { + if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex)) == -1) { g_free (log->metaindex); - msg_warn ("cannot write file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); unlock_file (log->fd, FALSE); return FALSE; } @@ -79,13 +71,9 @@ binlog_write_header (struct rspamd_binlog *log) /* Alloc, write, mmap */ log->cur_idx = g_malloc (sizeof (struct rspamd_index_block)); bzero (log->cur_idx, sizeof (struct rspamd_index_block)); - if (write (log->fd, log->cur_idx, - sizeof (struct rspamd_index_block)) == -1) { + if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { g_free (log->cur_idx); - msg_warn ("cannot write file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); unlock_file (log->fd, FALSE); return FALSE; } @@ -100,20 +88,14 @@ binlog_check_file (struct rspamd_binlog *log) { static gchar valid_magic[] = VALID_MAGIC, valid_version[] = VALID_VERSION; - if (read (log->fd, &log->header, - sizeof (struct rspamd_binlog_header)) != - sizeof (struct rspamd_binlog_header)) { - msg_warn ("cannot read file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + if (read (log->fd, &log->header, sizeof (struct rspamd_binlog_header)) != sizeof (struct rspamd_binlog_header)) { + msg_warn ("cannot read file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } /* Now check all fields */ - if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 || - memcmp (&log->header.version, valid_version, - sizeof (valid_version)) != 0) { + if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 || + memcmp (&log->header.version, valid_version, sizeof (valid_version)) != 0) { msg_warn ("cannot validate file %s"); return FALSE; } @@ -121,38 +103,24 @@ binlog_check_file (struct rspamd_binlog *log) if (log->metaindex == NULL) { log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex)); } - if ((read (log->fd, log->metaindex, - sizeof (struct rspamd_binlog_metaindex))) != - sizeof (struct rspamd_binlog_metaindex)) { - msg_warn ("cannot read metaindex of file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + if ((read (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex))) != sizeof (struct rspamd_binlog_metaindex)) { + msg_warn ("cannot read metaindex of file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } /* Current index */ if (log->cur_idx == NULL) { log->cur_idx = g_malloc (sizeof (struct rspamd_index_block)); } - if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], - SEEK_SET) == -1) { - msg_info ("cannot seek in file: %s, error: %s", - log->filename, - strerror (errno)); + if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], SEEK_SET) == -1) { + msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } - if ((read (log->fd, log->cur_idx, - sizeof (struct rspamd_index_block))) != - sizeof (struct rspamd_index_block)) { - msg_warn ("cannot read index in file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + if ((read (log->fd, log->cur_idx, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) { + msg_warn ("cannot read index in file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } - log->cur_seq = log->metaindex->last_index * BINLOG_IDX_LEN + - log->cur_idx->last_index; + log->cur_seq = log->metaindex->last_index * BINLOG_IDX_LEN + log->cur_idx->last_index; log->cur_time = log->cur_idx->indexes[log->cur_idx->last_index].time; return TRUE; @@ -162,13 +130,8 @@ binlog_check_file (struct rspamd_binlog *log) static gboolean binlog_create (struct rspamd_binlog *log) { - if ((log->fd = - open (log->filename, O_RDWR | O_TRUNC | O_CREAT, - S_IWUSR | S_IRUSR)) == -1) { - msg_info ("cannot create file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + if ((log->fd = open (log->filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) { + msg_info ("cannot create file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } @@ -179,25 +142,19 @@ static gboolean binlog_open_real (struct rspamd_binlog *log) { if ((log->fd = open (log->filename, O_RDWR)) == -1) { - msg_info ("cannot open file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + msg_info ("cannot open file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } - + return binlog_check_file (log); } -struct rspamd_binlog * -binlog_open (rspamd_mempool_t *pool, - const gchar *path, - time_t rotate_time, - gint rotate_jitter) +struct rspamd_binlog* +binlog_open (rspamd_mempool_t *pool, const gchar *path, time_t rotate_time, gint rotate_jitter) { struct rspamd_binlog *new; - gint len = strlen (path); + gint len = strlen (path); struct stat st; new = rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_binlog)); @@ -208,16 +165,15 @@ binlog_open (rspamd_mempool_t *pool, if (rotate_time) { new->rotate_jitter = g_random_int_range (0, rotate_jitter); } - + new->filename = rspamd_mempool_alloc (pool, len + sizeof (BINLOG_SUFFIX)); - rspamd_strlcpy (new->filename, path, len + 1); + rspamd_strlcpy (new->filename, path, len + 1); rspamd_strlcpy (new->filename + len, BINLOG_SUFFIX, sizeof (BINLOG_SUFFIX)); if (stat (new->filename, &st) == -1) { /* Check errno to check whether we should create this file */ if (errno != ENOENT) { - msg_err ("cannot stat file: %s, error %s", new->filename, - strerror (errno)); + msg_err ("cannot stat file: %s, error %s", new->filename, strerror (errno)); return NULL; } else { @@ -237,7 +193,7 @@ binlog_open (rspamd_mempool_t *pool, return new; } -void +void binlog_close (struct rspamd_binlog *log) { if (log) { @@ -254,18 +210,16 @@ binlog_close (struct rspamd_binlog *log) static gboolean binlog_tree_callback (gpointer key, gpointer value, gpointer data) { - token_node_t *node = key; - struct rspamd_binlog *log = data; - struct rspamd_binlog_element elt; + token_node_t *node = key; + struct rspamd_binlog *log = data; + struct rspamd_binlog_element elt; elt.h1 = node->h1; elt.h2 = node->h2; elt.value = node->value; - + if (write (log->fd, &elt, sizeof (elt)) == -1) { - msg_info ("cannot write token to file: %s, error: %s", - log->filename, - strerror (errno)); + msg_info ("cannot write token to file: %s, error: %s", log->filename, strerror (errno)); return TRUE; } @@ -279,14 +233,12 @@ write_binlog_tree (struct rspamd_binlog *log, GTree *nodes) struct rspamd_binlog_index *idx; lock_file (log->fd, FALSE); - log->cur_seq++; + log->cur_seq ++; /* Seek to end of file */ if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { unlock_file (log->fd, FALSE); - msg_info ("cannot seek in file: %s, error: %s", - log->filename, - strerror (errno)); + msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } @@ -299,28 +251,21 @@ write_binlog_tree (struct rspamd_binlog *log, GTree *nodes) idx->time = (guint64)time (NULL); log->cur_time = idx->time; idx->len = g_tree_nnodes (nodes) * sizeof (struct rspamd_binlog_element); - if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], - SEEK_SET) == -1) { + if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], SEEK_SET) == -1) { unlock_file (log->fd, FALSE); - msg_info ( - "cannot seek in file: %s, error: %s, seek: %L, op: insert index", - log->filename, - strerror (errno), - log->metaindex->indexes[log->metaindex->last_index]); + msg_info ("cannot seek in file: %s, error: %s, seek: %L, op: insert index", log->filename, + strerror (errno), log->metaindex->indexes[log->metaindex->last_index]); return FALSE; } - log->cur_idx->last_index++; - if (write (log->fd, log->cur_idx, - sizeof (struct rspamd_index_block)) == -1) { + log->cur_idx->last_index ++; + if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { unlock_file (log->fd, FALSE); - msg_info ("cannot write index to file: %s, error: %s", - log->filename, - strerror (errno)); + msg_info ("cannot write index to file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } unlock_file (log->fd, FALSE); - + return TRUE; } @@ -330,24 +275,18 @@ create_new_metaindex_block (struct rspamd_binlog *log) off_t seek; lock_file (log->fd, FALSE); - - log->metaindex->last_index++; + + log->metaindex->last_index ++; /* Seek to end of file */ if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) { unlock_file (log->fd, FALSE); - msg_info ("cannot seek in file: %s, error: %s", - log->filename, - strerror (errno)); + msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } - if (write (log->fd, log->cur_idx, - sizeof (struct rspamd_index_block)) == -1) { + if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) { unlock_file (log->fd, FALSE); g_free (log->cur_idx); - msg_warn ("cannot write file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno)); return FALSE; } /* Offset to metaindex */ @@ -355,17 +294,12 @@ create_new_metaindex_block (struct rspamd_binlog *log) /* Overwrite all metaindexes */ if (lseek (log->fd, sizeof (struct rspamd_binlog_header), SEEK_SET) == -1) { unlock_file (log->fd, FALSE); - msg_info ("cannot seek in file: %s, error: %s", - log->filename, - strerror (errno)); + msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } - if (write (log->fd, log->metaindex, - sizeof (struct rspamd_binlog_metaindex)) == -1) { + if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex)) == -1) { unlock_file (log->fd, FALSE); - msg_info ("cannot write metaindex in file: %s, error: %s", - log->filename, - strerror (errno)); + msg_info ("cannot write metaindex in file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } bzero (log->cur_idx, sizeof (struct rspamd_index_block)); @@ -377,11 +311,9 @@ create_new_metaindex_block (struct rspamd_binlog *log) static gboolean maybe_rotate_binlog (struct rspamd_binlog *log) { - guint64 now = time (NULL); + guint64 now = time (NULL); - if (log->rotate_time && - ((now - log->header.create_time) > - (guint)(log->rotate_time + log->rotate_jitter))) { + if (log->rotate_time && ((now - log->header.create_time) > (guint)(log->rotate_time + log->rotate_jitter))) { return TRUE; } return FALSE; @@ -390,7 +322,7 @@ maybe_rotate_binlog (struct rspamd_binlog *log) static gboolean rotate_binlog (struct rspamd_binlog *log) { - gchar *backup_name; + gchar *backup_name; struct stat st; lock_file (log->fd, FALSE); @@ -423,14 +355,13 @@ rotate_binlog (struct rspamd_binlog *log) } -gboolean +gboolean binlog_insert (struct rspamd_binlog *log, GTree *nodes) { off_t seek; if (!log || !log->metaindex || !log->cur_idx || !nodes) { - msg_info ("improperly opened binlog: %s", - log != NULL ? log->filename : "unknown"); + msg_info ("improperly opened binlog: %s", log != NULL ? log->filename : "unknown"); return FALSE; } @@ -450,9 +381,7 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes) if (log->metaindex->last_index < METAINDEX_LEN) { /* Create new index block */ if ((seek = lseek (log->fd, 0, SEEK_END)) == (off_t)-1) { - msg_info ("cannot seek in file: %s, error: %s", - log->filename, - strerror (errno)); + msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno)); return FALSE; } if (!create_new_metaindex_block (log)) { @@ -460,7 +389,7 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes) } return write_binlog_tree (log, nodes); } - + /* All binlog is filled, we need to rotate it forcefully */ if (!rotate_binlog (log)) { return FALSE; @@ -469,20 +398,16 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes) return write_binlog_tree (log, nodes); } -gboolean -binlog_sync (struct rspamd_binlog *log, - guint64 from_rev, - guint64 *from_time, - GByteArray **rep) +gboolean +binlog_sync (struct rspamd_binlog *log, guint64 from_rev, guint64 *from_time, GByteArray **rep) { - guint32 metaindex_num; + guint32 metaindex_num; struct rspamd_index_block *idxb; struct rspamd_binlog_index *idx; gboolean idx_mapped = FALSE, res = TRUE, is_first = FALSE; if (!log || !log->metaindex || !log->cur_idx) { - msg_info ("improperly opened binlog: %s", - log != NULL ? log->filename : "unknown"); + msg_info ("improperly opened binlog: %s", log != NULL ? log->filename : "unknown"); return FALSE; } @@ -494,7 +419,7 @@ binlog_sync (struct rspamd_binlog *log, /* Unmap old fragment */ g_free ((*rep)->data); } - + if (from_rev == log->cur_seq) { /* Last record */ *rep = NULL; @@ -502,11 +427,7 @@ binlog_sync (struct rspamd_binlog *log, } else if (from_rev > log->cur_seq) { /* Slave has more actual copy, write this to log and abort sync */ - msg_warn ( - "slave has more recent revision of statfile %s: %uL and our is: %uL", - log->filename, - from_rev, - log->cur_seq); + msg_warn ("slave has more recent revision of statfile %s: %uL and our is: %uL", log->filename, from_rev, log->cur_seq); *rep = NULL; *from_time = 0; return FALSE; @@ -522,24 +443,15 @@ binlog_sync (struct rspamd_binlog *log, lock_file (log->fd, FALSE); idxb = g_malloc (sizeof (struct rspamd_index_block)); idx_mapped = TRUE; - if (lseek (log->fd, log->metaindex->indexes[metaindex_num], - SEEK_SET) == -1) { + if (lseek (log->fd, log->metaindex->indexes[metaindex_num], SEEK_SET) == -1) { unlock_file (log->fd, FALSE); - msg_warn ("cannot seek file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno)); res = FALSE; goto end; } - if ((read (log->fd, idxb, - sizeof (struct rspamd_index_block))) != - sizeof (struct rspamd_index_block)) { + if ((read (log->fd, idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) { unlock_file (log->fd, FALSE); - msg_warn ("cannot read index from file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + msg_warn ("cannot read index from file %s, error %d, %s", log->filename, errno, strerror (errno)); res = FALSE; goto end; } @@ -562,27 +474,17 @@ binlog_sync (struct rspamd_binlog *log, /* Now fill reply structure */ (*rep)->len = idx->len; /* Read result */ - msg_info ( - "update from binlog '%s' from revision: %uL to revision %uL size is %uL", - log->filename, - from_rev, - log->cur_seq, - idx->len); + msg_info ("update from binlog '%s' from revision: %uL to revision %uL size is %uL", + log->filename, from_rev, log->cur_seq, idx->len); if (lseek (log->fd, idx->seek, SEEK_SET) == -1) { - msg_warn ("cannot seek file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno)); res = FALSE; goto end; } - + (*rep)->data = g_malloc (idx->len); if ((read (log->fd, (*rep)->data, idx->len)) != (ssize_t)idx->len) { - msg_warn ("cannot read file %s, error %d, %s", - log->filename, - errno, - strerror (errno)); + msg_warn ("cannot read file %s, error %d, %s", log->filename, errno, strerror (errno)); res = FALSE; goto end; } @@ -615,11 +517,8 @@ maybe_init_static (void) return TRUE; } -gboolean -maybe_write_binlog (struct rspamd_classifier_config *ccf, - struct rspamd_statfile_config *st, - stat_file_t *file, - GTree *nodes) +gboolean +maybe_write_binlog (struct rspamd_classifier_config *ccf, struct rspamd_statfile_config *st, stat_file_t *file, GTree *nodes) { struct rspamd_binlog *log; @@ -627,9 +526,8 @@ maybe_write_binlog (struct rspamd_classifier_config *ccf, return FALSE; } - - if (st == NULL || nodes == NULL || st->binlog == NULL || - st->binlog->affinity != AFFINITY_MASTER) { + + if (st == NULL || nodes == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) { return FALSE; } @@ -638,9 +536,7 @@ maybe_write_binlog (struct rspamd_classifier_config *ccf, } if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) { - if ((log = - binlog_open (binlog_pool, st->path, st->binlog->rotate_time, - st->binlog->rotate_time / 2)) != NULL) { + if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) { g_hash_table_insert (binlog_opened, st, log); } else { @@ -649,9 +545,7 @@ maybe_write_binlog (struct rspamd_classifier_config *ccf, } if (binlog_insert (log, nodes)) { - msg_info ("set new revision of statfile %s: %uL", - st->symbol, - log->cur_seq); + msg_info ("set new revision of statfile %s: %uL", st->symbol, log->cur_seq); (void)statfile_set_revision (file, log->cur_seq, log->cur_time); return TRUE; } @@ -659,13 +553,12 @@ maybe_write_binlog (struct rspamd_classifier_config *ccf, return FALSE; } -struct rspamd_binlog * +struct rspamd_binlog* get_binlog_by_statfile (struct rspamd_statfile_config *st) { struct rspamd_binlog *log; - if (st == NULL || st->binlog == NULL || st->binlog->affinity != - AFFINITY_MASTER) { + if (st == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) { return NULL; } @@ -674,15 +567,13 @@ get_binlog_by_statfile (struct rspamd_statfile_config *st) } if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) { - if ((log = - binlog_open (binlog_pool, st->path, st->binlog->rotate_time, - st->binlog->rotate_time / 2)) != NULL) { + if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) { g_hash_table_insert (binlog_opened, st, log); } else { return NULL; } } - + return log; } |