diff options
-rw-r--r-- | CMakeLists.txt | 3 | ||||
-rw-r--r-- | config.h.in | 4 | ||||
-rw-r--r-- | src/kvstorage.c | 2 | ||||
-rw-r--r-- | src/kvstorage_file.c | 135 |
4 files changed, 134 insertions, 10 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index b074d8c18..e9e41b6a0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -676,6 +676,9 @@ CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN) CHECK_SYMBOL_EXISTS(MAP_SHARED sys/mman.h HAVE_MMAP_SHARED) CHECK_SYMBOL_EXISTS(MAP_ANON sys/mman.h HAVE_MMAP_ANON) CHECK_SYMBOL_EXISTS(MAP_NOCORE sys/mman.h HAVE_MMAP_NOCORE) +CHECK_SYMBOL_EXISTS(O_DIRECT fcntl.h HAVE_O_DIRECT) +CHECK_SYMBOL_EXISTS(posix_fadvise fcntl.h HAVE_FADVISE) +CHECK_SYMBOL_EXISTS(fdatasync unistd.h HAVE_FDATASYNC) CHECK_SYMBOL_EXISTS(_SC_NPROCESSORS_ONLN unistd.h HAVE_SC_NPROCESSORS_ONLN) IF(HAVE_SIGINFO_H) CHECK_SYMBOL_EXISTS(SA_SIGINFO "signal.h;siginfo.h" HAVE_SA_SIGINFO) diff --git a/config.h.in b/config.h.in index 433c6789b..1a781a171 100644 --- a/config.h.in +++ b/config.h.in @@ -120,6 +120,10 @@ #cmakedefine HAVE_MMAP_NOCORE 1 +#cmakedefine HAVE_O_DIRECT 1 + +#cmakedefine HAVE_FADVISE 1 +#cmakedefine HAVE_FDATASYNC 1 #cmakedefine HAVE_COMPATIBLE_QUEUE_H 1 #cmakedefine HAVE_SC_NPROCESSORS_ONLN 1 diff --git a/src/kvstorage.c b/src/kvstorage.c index 511526336..bc9e600bc 100644 --- a/src/kvstorage.c +++ b/src/kvstorage.c @@ -93,7 +93,7 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k } /* Now check limits */ - while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) { + while (storage->memory + len > storage->max_memory) { if (storage->expire) { storage->expire->step_func (storage->expire, storage, time (NULL), steps); } diff --git a/src/kvstorage_file.c b/src/kvstorage_file.c index a7343aab9..14ae8d3b4 100644 --- a/src/kvstorage_file.c +++ b/src/kvstorage_file.c @@ -48,6 +48,7 @@ struct rspamd_file_backend { backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */ gchar *filename; gchar *dirname; + guint dirlen; guint sync_ops; guint levels; GQueue *ops_queue; @@ -57,13 +58,88 @@ struct rspamd_file_backend { static const gchar hexdigits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; +/* Generate file name for operation */ +static gboolean +get_file_name (struct rspamd_file_backend *db, gchar *key, guint keylen, gchar *filebuf, guint buflen) +{ + gchar *p = filebuf, *end = filebuf + buflen, + *k = key; + guint i; + + /* First copy backend dirname to file buf */ + if (buflen <= db->dirlen) { + return FALSE; + } + memcpy (p, db->dirname, db->dirlen); + p += db->dirlen; + *p++ = G_DIR_SEPARATOR; + for (i = 0; i < MIN (keylen, db->levels); i ++) { + if (p == end) { + /* Filebuf is not large enough */ + return FALSE; + } + *p++ = hexdigits[(*k) % 16]; + *p++ = G_DIR_SEPARATOR; + k ++; + } + /* Now we have directory, append base64 encoded filename */ + k = key; + if (end - p < (keylen / 3 + 1) * 4 + 4 + 1) { + /* Filebuf is not large enough */ + return FALSE; + } + + i = 0; + p += g_base64_encode_step (key, keylen, FALSE, p, &i, &i); + *p = '\0'; + + return TRUE; +} + /* Process single file operation */ static gboolean file_process_single_op (struct rspamd_file_backend *db, struct file_op *op) { - gboolean res = FALSE; + gchar filebuf[PATH_MAX]; + gint fd, flags; + + /* Get filename */ + if (!get_file_name (db, ELT_KEY (op->elt), op->elt->keylen, filebuf, sizeof (filebuf))) { + return FALSE; + } - return res; + if (op->op == FILE_OP_DELETE) { + return unlink (filebuf) != -1; + } + else { +#ifdef HAVE_O_DIRECT + flags = O_CREAT|O_TRUNC|O_WRONLY; +#else + flags = O_CREAT|O_TRUNC|O_WRONLY; +#endif + /* Open file */ + if ((fd = open (filebuf, flags, S_IRUSR|S_IWUSR|S_IRGRP)) == -1) { + return FALSE; + } + +#ifdef HAVE_FADVISE + posix_fadvise (fd, 0, ELT_SIZE (op->elt), POSIX_FADV_SEQUENTIAL); +#endif + if (write (fd, op->elt, ELT_SIZE (op->elt)) == -1) { + msg_info ("%d: %s", errno, strerror (errno)); + close (fd); + return FALSE; + } + } + +#ifdef HAVE_FDATASYNC + fdatasync (fd); +#else + fsync (fd); +#endif + + close (fd); + return TRUE; } /* Process operations queue */ @@ -107,15 +183,16 @@ file_process_queue (struct rspamd_kv_backend *backend) static gboolean rspamd_recursive_mkdir (guint levels) { - guint i, j; + guint i; gchar nbuf[5]; /* Create directories for backend */ if (levels > 0) { /* Create 16 directories */ - for (j = 0; j < 16; j ++) { - rspamd_snprintf (nbuf, sizeof (nbuf), "./%c", hexdigits[j]); + for (i = 0; i < 16; i ++) { + rspamd_snprintf (nbuf, sizeof (nbuf), "./%c", hexdigits[i]); if (mkdir (nbuf, 0755) != 0 && errno != EEXIST) { + msg_info ("cannot create directory %s: %s", nbuf, strerror (errno)); return FALSE; } else if (levels > 1) { @@ -136,7 +213,6 @@ static void rspamd_file_init (struct rspamd_kv_backend *backend) { struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend; - gint ret; gchar pathbuf[PATH_MAX]; /* Save current directory */ @@ -153,7 +229,9 @@ rspamd_file_init (struct rspamd_kv_backend *backend) } /* Create directories for backend */ - rspamd_recursive_mkdir (db->levels); + if (!rspamd_recursive_mkdir (db->levels)) { + goto err; + } db->initialized = TRUE; @@ -242,8 +320,9 @@ rspamd_file_lookup (struct rspamd_kv_backend *backend, gpointer key) struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend; struct file_op *op; struct rspamd_kv_element *elt = NULL; - gint l; - gconstpointer d; + gchar filebuf[PATH_MAX]; + gint fd, flags; + struct stat st; if (!db->initialized) { return NULL; @@ -256,6 +335,40 @@ rspamd_file_lookup (struct rspamd_kv_backend *backend, gpointer key) } return op->elt; } + + /* Get filename */ + if (!get_file_name (db, key, strlen (key), filebuf, sizeof (filebuf))) { + return NULL; + } + +#ifdef HAVE_O_DIRECT + flags = O_RDONLY; +#else + flags = O_RDONLY; +#endif + /* Open file */ + if ((fd = open (filebuf, flags)) == -1) { + return NULL; + } + + if (fstat (fd, &st) == -1) { + return NULL; + } + +#ifdef HAVE_FADVISE + posix_fadvise (fd, 0, st.st_size, POSIX_FADV_SEQUENTIAL); +#endif + + /* Read element */ + elt = g_malloc (st.st_size); + if (read (fd, elt, st.st_size) == -1) { + g_free (elt); + close (fd); + return NULL; + } + + close (fd); + return elt; } @@ -306,6 +419,9 @@ rspamd_file_destroy (struct rspamd_kv_backend *backend) g_queue_free (db->ops_queue); g_hash_table_unref (db->ops_hash); g_slice_free1 (sizeof (struct rspamd_file_backend), db); + + /* Sync again */ + sync (); } } @@ -333,6 +449,7 @@ rspamd_kv_file_new (const gchar *filename, guint sync_ops, guint levels) new = g_slice_alloc0 (sizeof (struct rspamd_file_backend)); new->dirname = dirname; + new->dirlen = strlen (dirname); new->filename = g_strdup (filename); new->sync_ops = sync_ops; new->levels = levels; |