aboutsummaryrefslogtreecommitdiffstats
path: root/src/libserver/binlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libserver/binlog.c')
-rw-r--r--src/libserver/binlog.c283
1 files changed, 196 insertions, 87 deletions
diff --git a/src/libserver/binlog.c b/src/libserver/binlog.c
index ec191eac7..ee0bf86bc 100644
--- a/src/libserver/binlog.c
+++ b/src/libserver/binlog.c
@@ -22,9 +22,9 @@
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
-#include "config.h"
#include "binlog.h"
#include "cfg_file.h"
+#include "config.h"
#include "tokenizers/tokenizers.h"
#define BINLOG_SUFFIX ".binlog"
@@ -35,7 +35,7 @@
static GHashTable *binlog_opened = NULL;
static rspamd_mempool_t *binlog_pool = NULL;
-static gboolean
+static gboolean
binlog_write_header (struct rspamd_binlog *log)
{
struct rspamd_binlog_header header = {
@@ -43,27 +43,35 @@ binlog_write_header (struct rspamd_binlog *log)
.version = VALID_VERSION,
.padding = { '\0', '\0' },
};
-
+
header.create_time = time (NULL);
lock_file (log->fd, FALSE);
-
+
if (write (log->fd, &header, sizeof (struct rspamd_binlog_header)) == -1) {
- msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
+ msg_warn ("cannot write file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
return FALSE;
}
memcpy (&log->header, &header, sizeof (struct rspamd_binlog_header));
-
+
/* Metaindex */
log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex));
bzero (log->metaindex, sizeof (struct rspamd_binlog_metaindex));
/* Offset to metaindex */
- log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) + sizeof (struct rspamd_binlog_header);
+ log->metaindex->indexes[0] = sizeof (struct rspamd_binlog_metaindex) +
+ sizeof (struct rspamd_binlog_header);
- if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex)) == -1) {
+ if (write (log->fd, log->metaindex,
+ sizeof (struct rspamd_binlog_metaindex)) == -1) {
g_free (log->metaindex);
- msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
+ msg_warn ("cannot write file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
unlock_file (log->fd, FALSE);
return FALSE;
}
@@ -71,9 +79,13 @@ binlog_write_header (struct rspamd_binlog *log)
/* Alloc, write, mmap */
log->cur_idx = g_malloc (sizeof (struct rspamd_index_block));
bzero (log->cur_idx, sizeof (struct rspamd_index_block));
- if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) {
+ if (write (log->fd, log->cur_idx,
+ sizeof (struct rspamd_index_block)) == -1) {
g_free (log->cur_idx);
- msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
+ msg_warn ("cannot write file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
unlock_file (log->fd, FALSE);
return FALSE;
}
@@ -88,14 +100,20 @@ binlog_check_file (struct rspamd_binlog *log)
{
static gchar valid_magic[] = VALID_MAGIC, valid_version[] = VALID_VERSION;
- if (read (log->fd, &log->header, sizeof (struct rspamd_binlog_header)) != sizeof (struct rspamd_binlog_header)) {
- msg_warn ("cannot read file %s, error %d, %s", log->filename, errno, strerror (errno));
+ if (read (log->fd, &log->header,
+ sizeof (struct rspamd_binlog_header)) !=
+ sizeof (struct rspamd_binlog_header)) {
+ msg_warn ("cannot read file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
return FALSE;
}
/* Now check all fields */
- if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 ||
- memcmp (&log->header.version, valid_version, sizeof (valid_version)) != 0) {
+ if (memcmp (&log->header.magic, valid_magic, sizeof (valid_magic)) != 0 ||
+ memcmp (&log->header.version, valid_version,
+ sizeof (valid_version)) != 0) {
msg_warn ("cannot validate file %s");
return FALSE;
}
@@ -103,24 +121,38 @@ binlog_check_file (struct rspamd_binlog *log)
if (log->metaindex == NULL) {
log->metaindex = g_malloc (sizeof (struct rspamd_binlog_metaindex));
}
- if ((read (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex))) != sizeof (struct rspamd_binlog_metaindex)) {
- msg_warn ("cannot read metaindex of file %s, error %d, %s", log->filename, errno, strerror (errno));
+ if ((read (log->fd, log->metaindex,
+ sizeof (struct rspamd_binlog_metaindex))) !=
+ sizeof (struct rspamd_binlog_metaindex)) {
+ msg_warn ("cannot read metaindex of file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
return FALSE;
}
/* Current index */
if (log->cur_idx == NULL) {
log->cur_idx = g_malloc (sizeof (struct rspamd_index_block));
}
- if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], SEEK_SET) == -1) {
- msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno));
+ if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index],
+ SEEK_SET) == -1) {
+ msg_info ("cannot seek in file: %s, error: %s",
+ log->filename,
+ strerror (errno));
return FALSE;
}
- if ((read (log->fd, log->cur_idx, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) {
- msg_warn ("cannot read index in file %s, error %d, %s", log->filename, errno, strerror (errno));
+ if ((read (log->fd, log->cur_idx,
+ sizeof (struct rspamd_index_block))) !=
+ sizeof (struct rspamd_index_block)) {
+ msg_warn ("cannot read index in file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
return FALSE;
}
- log->cur_seq = log->metaindex->last_index * BINLOG_IDX_LEN + log->cur_idx->last_index;
+ log->cur_seq = log->metaindex->last_index * BINLOG_IDX_LEN +
+ log->cur_idx->last_index;
log->cur_time = log->cur_idx->indexes[log->cur_idx->last_index].time;
return TRUE;
@@ -130,8 +162,13 @@ binlog_check_file (struct rspamd_binlog *log)
static gboolean
binlog_create (struct rspamd_binlog *log)
{
- if ((log->fd = open (log->filename, O_RDWR | O_TRUNC | O_CREAT, S_IWUSR | S_IRUSR)) == -1) {
- msg_info ("cannot create file %s, error %d, %s", log->filename, errno, strerror (errno));
+ if ((log->fd =
+ open (log->filename, O_RDWR | O_TRUNC | O_CREAT,
+ S_IWUSR | S_IRUSR)) == -1) {
+ msg_info ("cannot create file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
return FALSE;
}
@@ -142,19 +179,25 @@ static gboolean
binlog_open_real (struct rspamd_binlog *log)
{
if ((log->fd = open (log->filename, O_RDWR)) == -1) {
- msg_info ("cannot open file %s, error %d, %s", log->filename, errno, strerror (errno));
+ msg_info ("cannot open file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
return FALSE;
}
-
+
return binlog_check_file (log);
}
-struct rspamd_binlog*
-binlog_open (rspamd_mempool_t *pool, const gchar *path, time_t rotate_time, gint rotate_jitter)
+struct rspamd_binlog *
+binlog_open (rspamd_mempool_t *pool,
+ const gchar *path,
+ time_t rotate_time,
+ gint rotate_jitter)
{
struct rspamd_binlog *new;
- gint len = strlen (path);
+ gint len = strlen (path);
struct stat st;
new = rspamd_mempool_alloc0 (pool, sizeof (struct rspamd_binlog));
@@ -165,15 +208,16 @@ binlog_open (rspamd_mempool_t *pool, const gchar *path, time_t rotate_time, gint
if (rotate_time) {
new->rotate_jitter = g_random_int_range (0, rotate_jitter);
}
-
+
new->filename = rspamd_mempool_alloc (pool, len + sizeof (BINLOG_SUFFIX));
- rspamd_strlcpy (new->filename, path, len + 1);
+ rspamd_strlcpy (new->filename, path, len + 1);
rspamd_strlcpy (new->filename + len, BINLOG_SUFFIX, sizeof (BINLOG_SUFFIX));
if (stat (new->filename, &st) == -1) {
/* Check errno to check whether we should create this file */
if (errno != ENOENT) {
- msg_err ("cannot stat file: %s, error %s", new->filename, strerror (errno));
+ msg_err ("cannot stat file: %s, error %s", new->filename,
+ strerror (errno));
return NULL;
}
else {
@@ -193,7 +237,7 @@ binlog_open (rspamd_mempool_t *pool, const gchar *path, time_t rotate_time, gint
return new;
}
-void
+void
binlog_close (struct rspamd_binlog *log)
{
if (log) {
@@ -210,16 +254,18 @@ binlog_close (struct rspamd_binlog *log)
static gboolean
binlog_tree_callback (gpointer key, gpointer value, gpointer data)
{
- token_node_t *node = key;
- struct rspamd_binlog *log = data;
- struct rspamd_binlog_element elt;
+ token_node_t *node = key;
+ struct rspamd_binlog *log = data;
+ struct rspamd_binlog_element elt;
elt.h1 = node->h1;
elt.h2 = node->h2;
elt.value = node->value;
-
+
if (write (log->fd, &elt, sizeof (elt)) == -1) {
- msg_info ("cannot write token to file: %s, error: %s", log->filename, strerror (errno));
+ msg_info ("cannot write token to file: %s, error: %s",
+ log->filename,
+ strerror (errno));
return TRUE;
}
@@ -233,12 +279,14 @@ write_binlog_tree (struct rspamd_binlog *log, GTree *nodes)
struct rspamd_binlog_index *idx;
lock_file (log->fd, FALSE);
- log->cur_seq ++;
+ log->cur_seq++;
/* Seek to end of file */
if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
unlock_file (log->fd, FALSE);
- msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno));
+ msg_info ("cannot seek in file: %s, error: %s",
+ log->filename,
+ strerror (errno));
return FALSE;
}
@@ -251,21 +299,28 @@ write_binlog_tree (struct rspamd_binlog *log, GTree *nodes)
idx->time = (guint64)time (NULL);
log->cur_time = idx->time;
idx->len = g_tree_nnodes (nodes) * sizeof (struct rspamd_binlog_element);
- if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index], SEEK_SET) == -1) {
+ if (lseek (log->fd, log->metaindex->indexes[log->metaindex->last_index],
+ SEEK_SET) == -1) {
unlock_file (log->fd, FALSE);
- msg_info ("cannot seek in file: %s, error: %s, seek: %L, op: insert index", log->filename,
- strerror (errno), log->metaindex->indexes[log->metaindex->last_index]);
+ msg_info (
+ "cannot seek in file: %s, error: %s, seek: %L, op: insert index",
+ log->filename,
+ strerror (errno),
+ log->metaindex->indexes[log->metaindex->last_index]);
return FALSE;
}
- log->cur_idx->last_index ++;
- if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) {
+ log->cur_idx->last_index++;
+ if (write (log->fd, log->cur_idx,
+ sizeof (struct rspamd_index_block)) == -1) {
unlock_file (log->fd, FALSE);
- msg_info ("cannot write index to file: %s, error: %s", log->filename, strerror (errno));
+ msg_info ("cannot write index to file: %s, error: %s",
+ log->filename,
+ strerror (errno));
return FALSE;
}
unlock_file (log->fd, FALSE);
-
+
return TRUE;
}
@@ -275,18 +330,24 @@ create_new_metaindex_block (struct rspamd_binlog *log)
off_t seek;
lock_file (log->fd, FALSE);
-
- log->metaindex->last_index ++;
+
+ log->metaindex->last_index++;
/* Seek to end of file */
if ((seek = lseek (log->fd, 0, SEEK_END)) == -1) {
unlock_file (log->fd, FALSE);
- msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno));
+ msg_info ("cannot seek in file: %s, error: %s",
+ log->filename,
+ strerror (errno));
return FALSE;
}
- if (write (log->fd, log->cur_idx, sizeof (struct rspamd_index_block)) == -1) {
+ if (write (log->fd, log->cur_idx,
+ sizeof (struct rspamd_index_block)) == -1) {
unlock_file (log->fd, FALSE);
g_free (log->cur_idx);
- msg_warn ("cannot write file %s, error %d, %s", log->filename, errno, strerror (errno));
+ msg_warn ("cannot write file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
return FALSE;
}
/* Offset to metaindex */
@@ -294,12 +355,17 @@ create_new_metaindex_block (struct rspamd_binlog *log)
/* Overwrite all metaindexes */
if (lseek (log->fd, sizeof (struct rspamd_binlog_header), SEEK_SET) == -1) {
unlock_file (log->fd, FALSE);
- msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno));
+ msg_info ("cannot seek in file: %s, error: %s",
+ log->filename,
+ strerror (errno));
return FALSE;
}
- if (write (log->fd, log->metaindex, sizeof (struct rspamd_binlog_metaindex)) == -1) {
+ if (write (log->fd, log->metaindex,
+ sizeof (struct rspamd_binlog_metaindex)) == -1) {
unlock_file (log->fd, FALSE);
- msg_info ("cannot write metaindex in file: %s, error: %s", log->filename, strerror (errno));
+ msg_info ("cannot write metaindex in file: %s, error: %s",
+ log->filename,
+ strerror (errno));
return FALSE;
}
bzero (log->cur_idx, sizeof (struct rspamd_index_block));
@@ -311,9 +377,11 @@ create_new_metaindex_block (struct rspamd_binlog *log)
static gboolean
maybe_rotate_binlog (struct rspamd_binlog *log)
{
- guint64 now = time (NULL);
+ guint64 now = time (NULL);
- if (log->rotate_time && ((now - log->header.create_time) > (guint)(log->rotate_time + log->rotate_jitter))) {
+ if (log->rotate_time &&
+ ((now - log->header.create_time) >
+ (guint)(log->rotate_time + log->rotate_jitter))) {
return TRUE;
}
return FALSE;
@@ -322,7 +390,7 @@ maybe_rotate_binlog (struct rspamd_binlog *log)
static gboolean
rotate_binlog (struct rspamd_binlog *log)
{
- gchar *backup_name;
+ gchar *backup_name;
struct stat st;
lock_file (log->fd, FALSE);
@@ -355,13 +423,14 @@ rotate_binlog (struct rspamd_binlog *log)
}
-gboolean
+gboolean
binlog_insert (struct rspamd_binlog *log, GTree *nodes)
{
off_t seek;
if (!log || !log->metaindex || !log->cur_idx || !nodes) {
- msg_info ("improperly opened binlog: %s", log != NULL ? log->filename : "unknown");
+ msg_info ("improperly opened binlog: %s",
+ log != NULL ? log->filename : "unknown");
return FALSE;
}
@@ -381,7 +450,9 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes)
if (log->metaindex->last_index < METAINDEX_LEN) {
/* Create new index block */
if ((seek = lseek (log->fd, 0, SEEK_END)) == (off_t)-1) {
- msg_info ("cannot seek in file: %s, error: %s", log->filename, strerror (errno));
+ msg_info ("cannot seek in file: %s, error: %s",
+ log->filename,
+ strerror (errno));
return FALSE;
}
if (!create_new_metaindex_block (log)) {
@@ -389,7 +460,7 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes)
}
return write_binlog_tree (log, nodes);
}
-
+
/* All binlog is filled, we need to rotate it forcefully */
if (!rotate_binlog (log)) {
return FALSE;
@@ -398,16 +469,20 @@ binlog_insert (struct rspamd_binlog *log, GTree *nodes)
return write_binlog_tree (log, nodes);
}
-gboolean
-binlog_sync (struct rspamd_binlog *log, guint64 from_rev, guint64 *from_time, GByteArray **rep)
+gboolean
+binlog_sync (struct rspamd_binlog *log,
+ guint64 from_rev,
+ guint64 *from_time,
+ GByteArray **rep)
{
- guint32 metaindex_num;
+ guint32 metaindex_num;
struct rspamd_index_block *idxb;
struct rspamd_binlog_index *idx;
gboolean idx_mapped = FALSE, res = TRUE, is_first = FALSE;
if (!log || !log->metaindex || !log->cur_idx) {
- msg_info ("improperly opened binlog: %s", log != NULL ? log->filename : "unknown");
+ msg_info ("improperly opened binlog: %s",
+ log != NULL ? log->filename : "unknown");
return FALSE;
}
@@ -419,7 +494,7 @@ binlog_sync (struct rspamd_binlog *log, guint64 from_rev, guint64 *from_time, GB
/* Unmap old fragment */
g_free ((*rep)->data);
}
-
+
if (from_rev == log->cur_seq) {
/* Last record */
*rep = NULL;
@@ -427,7 +502,11 @@ binlog_sync (struct rspamd_binlog *log, guint64 from_rev, guint64 *from_time, GB
}
else if (from_rev > log->cur_seq) {
/* Slave has more actual copy, write this to log and abort sync */
- msg_warn ("slave has more recent revision of statfile %s: %uL and our is: %uL", log->filename, from_rev, log->cur_seq);
+ msg_warn (
+ "slave has more recent revision of statfile %s: %uL and our is: %uL",
+ log->filename,
+ from_rev,
+ log->cur_seq);
*rep = NULL;
*from_time = 0;
return FALSE;
@@ -443,15 +522,24 @@ binlog_sync (struct rspamd_binlog *log, guint64 from_rev, guint64 *from_time, GB
lock_file (log->fd, FALSE);
idxb = g_malloc (sizeof (struct rspamd_index_block));
idx_mapped = TRUE;
- if (lseek (log->fd, log->metaindex->indexes[metaindex_num], SEEK_SET) == -1) {
+ if (lseek (log->fd, log->metaindex->indexes[metaindex_num],
+ SEEK_SET) == -1) {
unlock_file (log->fd, FALSE);
- msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno));
+ msg_warn ("cannot seek file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
res = FALSE;
goto end;
}
- if ((read (log->fd, idxb, sizeof (struct rspamd_index_block))) != sizeof (struct rspamd_index_block)) {
+ if ((read (log->fd, idxb,
+ sizeof (struct rspamd_index_block))) !=
+ sizeof (struct rspamd_index_block)) {
unlock_file (log->fd, FALSE);
- msg_warn ("cannot read index from file %s, error %d, %s", log->filename, errno, strerror (errno));
+ msg_warn ("cannot read index from file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
res = FALSE;
goto end;
}
@@ -474,17 +562,27 @@ binlog_sync (struct rspamd_binlog *log, guint64 from_rev, guint64 *from_time, GB
/* Now fill reply structure */
(*rep)->len = idx->len;
/* Read result */
- msg_info ("update from binlog '%s' from revision: %uL to revision %uL size is %uL",
- log->filename, from_rev, log->cur_seq, idx->len);
+ msg_info (
+ "update from binlog '%s' from revision: %uL to revision %uL size is %uL",
+ log->filename,
+ from_rev,
+ log->cur_seq,
+ idx->len);
if (lseek (log->fd, idx->seek, SEEK_SET) == -1) {
- msg_warn ("cannot seek file %s, error %d, %s", log->filename, errno, strerror (errno));
+ msg_warn ("cannot seek file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
res = FALSE;
goto end;
}
-
+
(*rep)->data = g_malloc (idx->len);
if ((read (log->fd, (*rep)->data, idx->len)) != (ssize_t)idx->len) {
- msg_warn ("cannot read file %s, error %d, %s", log->filename, errno, strerror (errno));
+ msg_warn ("cannot read file %s, error %d, %s",
+ log->filename,
+ errno,
+ strerror (errno));
res = FALSE;
goto end;
}
@@ -517,8 +615,11 @@ maybe_init_static (void)
return TRUE;
}
-gboolean
-maybe_write_binlog (struct rspamd_classifier_config *ccf, struct rspamd_statfile_config *st, stat_file_t *file, GTree *nodes)
+gboolean
+maybe_write_binlog (struct rspamd_classifier_config *ccf,
+ struct rspamd_statfile_config *st,
+ stat_file_t *file,
+ GTree *nodes)
{
struct rspamd_binlog *log;
@@ -526,8 +627,9 @@ maybe_write_binlog (struct rspamd_classifier_config *ccf, struct rspamd_statfile
return FALSE;
}
-
- if (st == NULL || nodes == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) {
+
+ if (st == NULL || nodes == NULL || st->binlog == NULL ||
+ st->binlog->affinity != AFFINITY_MASTER) {
return FALSE;
}
@@ -536,7 +638,9 @@ maybe_write_binlog (struct rspamd_classifier_config *ccf, struct rspamd_statfile
}
if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) {
- if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) {
+ if ((log =
+ binlog_open (binlog_pool, st->path, st->binlog->rotate_time,
+ st->binlog->rotate_time / 2)) != NULL) {
g_hash_table_insert (binlog_opened, st, log);
}
else {
@@ -545,7 +649,9 @@ maybe_write_binlog (struct rspamd_classifier_config *ccf, struct rspamd_statfile
}
if (binlog_insert (log, nodes)) {
- msg_info ("set new revision of statfile %s: %uL", st->symbol, log->cur_seq);
+ msg_info ("set new revision of statfile %s: %uL",
+ st->symbol,
+ log->cur_seq);
(void)statfile_set_revision (file, log->cur_seq, log->cur_time);
return TRUE;
}
@@ -553,12 +659,13 @@ maybe_write_binlog (struct rspamd_classifier_config *ccf, struct rspamd_statfile
return FALSE;
}
-struct rspamd_binlog*
+struct rspamd_binlog *
get_binlog_by_statfile (struct rspamd_statfile_config *st)
{
struct rspamd_binlog *log;
- if (st == NULL || st->binlog == NULL || st->binlog->affinity != AFFINITY_MASTER) {
+ if (st == NULL || st->binlog == NULL || st->binlog->affinity !=
+ AFFINITY_MASTER) {
return NULL;
}
@@ -567,13 +674,15 @@ get_binlog_by_statfile (struct rspamd_statfile_config *st)
}
if ((log = g_hash_table_lookup (binlog_opened, st)) == NULL) {
- if ((log = binlog_open (binlog_pool, st->path, st->binlog->rotate_time, st->binlog->rotate_time / 2)) != NULL) {
+ if ((log =
+ binlog_open (binlog_pool, st->path, st->binlog->rotate_time,
+ st->binlog->rotate_time / 2)) != NULL) {
g_hash_table_insert (binlog_opened, st, log);
}
else {
return NULL;
}
}
-
+
return log;
}