aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt3
-rw-r--r--config.h.in4
-rw-r--r--src/kvstorage.c2
-rw-r--r--src/kvstorage_file.c135
4 files changed, 134 insertions, 10 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b074d8c18..e9e41b6a0 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -676,6 +676,9 @@ CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN)
CHECK_SYMBOL_EXISTS(MAP_SHARED sys/mman.h HAVE_MMAP_SHARED)
CHECK_SYMBOL_EXISTS(MAP_ANON sys/mman.h HAVE_MMAP_ANON)
CHECK_SYMBOL_EXISTS(MAP_NOCORE sys/mman.h HAVE_MMAP_NOCORE)
+CHECK_SYMBOL_EXISTS(O_DIRECT fcntl.h HAVE_O_DIRECT)
+CHECK_SYMBOL_EXISTS(posix_fadvise fcntl.h HAVE_FADVISE)
+CHECK_SYMBOL_EXISTS(fdatasync unistd.h HAVE_FDATASYNC)
CHECK_SYMBOL_EXISTS(_SC_NPROCESSORS_ONLN unistd.h HAVE_SC_NPROCESSORS_ONLN)
IF(HAVE_SIGINFO_H)
CHECK_SYMBOL_EXISTS(SA_SIGINFO "signal.h;siginfo.h" HAVE_SA_SIGINFO)
diff --git a/config.h.in b/config.h.in
index 433c6789b..1a781a171 100644
--- a/config.h.in
+++ b/config.h.in
@@ -120,6 +120,10 @@
#cmakedefine HAVE_MMAP_NOCORE 1
+#cmakedefine HAVE_O_DIRECT 1
+
+#cmakedefine HAVE_FADVISE 1
+#cmakedefine HAVE_FDATASYNC 1
#cmakedefine HAVE_COMPATIBLE_QUEUE_H 1
#cmakedefine HAVE_SC_NPROCESSORS_ONLN 1
diff --git a/src/kvstorage.c b/src/kvstorage.c
index 511526336..bc9e600bc 100644
--- a/src/kvstorage.c
+++ b/src/kvstorage.c
@@ -93,7 +93,7 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
}
/* Now check limits */
- while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) {
+ while (storage->memory + len > storage->max_memory) {
if (storage->expire) {
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
}
diff --git a/src/kvstorage_file.c b/src/kvstorage_file.c
index a7343aab9..14ae8d3b4 100644
--- a/src/kvstorage_file.c
+++ b/src/kvstorage_file.c
@@ -48,6 +48,7 @@ struct rspamd_file_backend {
backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */
gchar *filename;
gchar *dirname;
+ guint dirlen;
guint sync_ops;
guint levels;
GQueue *ops_queue;
@@ -57,13 +58,88 @@ struct rspamd_file_backend {
static const gchar hexdigits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+/* Generate file name for operation */
+static gboolean
+get_file_name (struct rspamd_file_backend *db, gchar *key, guint keylen, gchar *filebuf, guint buflen)
+{
+ gchar *p = filebuf, *end = filebuf + buflen,
+ *k = key;
+ guint i;
+
+ /* First copy backend dirname to file buf */
+ if (buflen <= db->dirlen) {
+ return FALSE;
+ }
+ memcpy (p, db->dirname, db->dirlen);
+ p += db->dirlen;
+ *p++ = G_DIR_SEPARATOR;
+ for (i = 0; i < MIN (keylen, db->levels); i ++) {
+ if (p == end) {
+ /* Filebuf is not large enough */
+ return FALSE;
+ }
+ *p++ = hexdigits[(*k) % 16];
+ *p++ = G_DIR_SEPARATOR;
+ k ++;
+ }
+ /* Now we have directory, append base64 encoded filename */
+ k = key;
+ if (end - p < (keylen / 3 + 1) * 4 + 4 + 1) {
+ /* Filebuf is not large enough */
+ return FALSE;
+ }
+
+ i = 0;
+ p += g_base64_encode_step (key, keylen, FALSE, p, &i, &i);
+ *p = '\0';
+
+ return TRUE;
+}
+
/* Process single file operation */
static gboolean
file_process_single_op (struct rspamd_file_backend *db, struct file_op *op)
{
- gboolean res = FALSE;
+ gchar filebuf[PATH_MAX];
+ gint fd, flags;
+
+ /* Get filename */
+ if (!get_file_name (db, ELT_KEY (op->elt), op->elt->keylen, filebuf, sizeof (filebuf))) {
+ return FALSE;
+ }
- return res;
+ if (op->op == FILE_OP_DELETE) {
+ return unlink (filebuf) != -1;
+ }
+ else {
+#ifdef HAVE_O_DIRECT
+ flags = O_CREAT|O_TRUNC|O_WRONLY;
+#else
+ flags = O_CREAT|O_TRUNC|O_WRONLY;
+#endif
+ /* Open file */
+ if ((fd = open (filebuf, flags, S_IRUSR|S_IWUSR|S_IRGRP)) == -1) {
+ return FALSE;
+ }
+
+#ifdef HAVE_FADVISE
+ posix_fadvise (fd, 0, ELT_SIZE (op->elt), POSIX_FADV_SEQUENTIAL);
+#endif
+ if (write (fd, op->elt, ELT_SIZE (op->elt)) == -1) {
+ msg_info ("%d: %s", errno, strerror (errno));
+ close (fd);
+ return FALSE;
+ }
+ }
+
+#ifdef HAVE_FDATASYNC
+ fdatasync (fd);
+#else
+ fsync (fd);
+#endif
+
+ close (fd);
+ return TRUE;
}
/* Process operations queue */
@@ -107,15 +183,16 @@ file_process_queue (struct rspamd_kv_backend *backend)
static gboolean
rspamd_recursive_mkdir (guint levels)
{
- guint i, j;
+ guint i;
gchar nbuf[5];
/* Create directories for backend */
if (levels > 0) {
/* Create 16 directories */
- for (j = 0; j < 16; j ++) {
- rspamd_snprintf (nbuf, sizeof (nbuf), "./%c", hexdigits[j]);
+ for (i = 0; i < 16; i ++) {
+ rspamd_snprintf (nbuf, sizeof (nbuf), "./%c", hexdigits[i]);
if (mkdir (nbuf, 0755) != 0 && errno != EEXIST) {
+ msg_info ("cannot create directory %s: %s", nbuf, strerror (errno));
return FALSE;
}
else if (levels > 1) {
@@ -136,7 +213,6 @@ static void
rspamd_file_init (struct rspamd_kv_backend *backend)
{
struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend;
- gint ret;
gchar pathbuf[PATH_MAX];
/* Save current directory */
@@ -153,7 +229,9 @@ rspamd_file_init (struct rspamd_kv_backend *backend)
}
/* Create directories for backend */
- rspamd_recursive_mkdir (db->levels);
+ if (!rspamd_recursive_mkdir (db->levels)) {
+ goto err;
+ }
db->initialized = TRUE;
@@ -242,8 +320,9 @@ rspamd_file_lookup (struct rspamd_kv_backend *backend, gpointer key)
struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend;
struct file_op *op;
struct rspamd_kv_element *elt = NULL;
- gint l;
- gconstpointer d;
+ gchar filebuf[PATH_MAX];
+ gint fd, flags;
+ struct stat st;
if (!db->initialized) {
return NULL;
@@ -256,6 +335,40 @@ rspamd_file_lookup (struct rspamd_kv_backend *backend, gpointer key)
}
return op->elt;
}
+
+ /* Get filename */
+ if (!get_file_name (db, key, strlen (key), filebuf, sizeof (filebuf))) {
+ return NULL;
+ }
+
+#ifdef HAVE_O_DIRECT
+ flags = O_RDONLY;
+#else
+ flags = O_RDONLY;
+#endif
+ /* Open file */
+ if ((fd = open (filebuf, flags)) == -1) {
+ return NULL;
+ }
+
+ if (fstat (fd, &st) == -1) {
+ return NULL;
+ }
+
+#ifdef HAVE_FADVISE
+ posix_fadvise (fd, 0, st.st_size, POSIX_FADV_SEQUENTIAL);
+#endif
+
+ /* Read element */
+ elt = g_malloc (st.st_size);
+ if (read (fd, elt, st.st_size) == -1) {
+ g_free (elt);
+ close (fd);
+ return NULL;
+ }
+
+ close (fd);
+
return elt;
}
@@ -306,6 +419,9 @@ rspamd_file_destroy (struct rspamd_kv_backend *backend)
g_queue_free (db->ops_queue);
g_hash_table_unref (db->ops_hash);
g_slice_free1 (sizeof (struct rspamd_file_backend), db);
+
+ /* Sync again */
+ sync ();
}
}
@@ -333,6 +449,7 @@ rspamd_kv_file_new (const gchar *filename, guint sync_ops, guint levels)
new = g_slice_alloc0 (sizeof (struct rspamd_file_backend));
new->dirname = dirname;
+ new->dirlen = strlen (dirname);
new->filename = g_strdup (filename);
new->sync_ops = sync_ops;
new->levels = levels;