Browse Source

* Implement file backend.

tags/0.4.5
Vsevolod Stakhov 12 years ago
parent
commit
af6dc0cb17
4 changed files with 134 additions and 10 deletions
  1. 3
    0
      CMakeLists.txt
  2. 4
    0
      config.h.in
  3. 1
    1
      src/kvstorage.c
  4. 126
    9
      src/kvstorage_file.c

+ 3
- 0
CMakeLists.txt View File

@@ -676,6 +676,9 @@ CHECK_SYMBOL_EXISTS(MAXPATHLEN sys/param.h HAVE_MAXPATHLEN)
CHECK_SYMBOL_EXISTS(MAP_SHARED sys/mman.h HAVE_MMAP_SHARED)
CHECK_SYMBOL_EXISTS(MAP_ANON sys/mman.h HAVE_MMAP_ANON)
CHECK_SYMBOL_EXISTS(MAP_NOCORE sys/mman.h HAVE_MMAP_NOCORE)
CHECK_SYMBOL_EXISTS(O_DIRECT fcntl.h HAVE_O_DIRECT)
CHECK_SYMBOL_EXISTS(posix_fadvise fcntl.h HAVE_FADVISE)
CHECK_SYMBOL_EXISTS(fdatasync unistd.h HAVE_FDATASYNC)
CHECK_SYMBOL_EXISTS(_SC_NPROCESSORS_ONLN unistd.h HAVE_SC_NPROCESSORS_ONLN)
IF(HAVE_SIGINFO_H)
CHECK_SYMBOL_EXISTS(SA_SIGINFO "signal.h;siginfo.h" HAVE_SA_SIGINFO)

+ 4
- 0
config.h.in View File

@@ -120,6 +120,10 @@

#cmakedefine HAVE_MMAP_NOCORE 1

#cmakedefine HAVE_O_DIRECT 1

#cmakedefine HAVE_FADVISE 1
#cmakedefine HAVE_FDATASYNC 1
#cmakedefine HAVE_COMPATIBLE_QUEUE_H 1

#cmakedefine HAVE_SC_NPROCESSORS_ONLN 1

+ 1
- 1
src/kvstorage.c View File

@@ -93,7 +93,7 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
}

/* Now check limits */
while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) {
while (storage->memory + len > storage->max_memory) {
if (storage->expire) {
storage->expire->step_func (storage->expire, storage, time (NULL), steps);
}

+ 126
- 9
src/kvstorage_file.c View File

@@ -48,6 +48,7 @@ struct rspamd_file_backend {
backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */
gchar *filename;
gchar *dirname;
guint dirlen;
guint sync_ops;
guint levels;
GQueue *ops_queue;
@@ -57,13 +58,88 @@ struct rspamd_file_backend {

static const gchar hexdigits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};

/* Generate file name for operation */
static gboolean
get_file_name (struct rspamd_file_backend *db, gchar *key, guint keylen, gchar *filebuf, guint buflen)
{
gchar *p = filebuf, *end = filebuf + buflen,
*k = key;
guint i;

/* First copy backend dirname to file buf */
if (buflen <= db->dirlen) {
return FALSE;
}
memcpy (p, db->dirname, db->dirlen);
p += db->dirlen;
*p++ = G_DIR_SEPARATOR;
for (i = 0; i < MIN (keylen, db->levels); i ++) {
if (p == end) {
/* Filebuf is not large enough */
return FALSE;
}
*p++ = hexdigits[(*k) % 16];
*p++ = G_DIR_SEPARATOR;
k ++;
}
/* Now we have directory, append base64 encoded filename */
k = key;
if (end - p < (keylen / 3 + 1) * 4 + 4 + 1) {
/* Filebuf is not large enough */
return FALSE;
}

i = 0;
p += g_base64_encode_step (key, keylen, FALSE, p, &i, &i);
*p = '\0';

return TRUE;
}

/* Process single file operation */
static gboolean
file_process_single_op (struct rspamd_file_backend *db, struct file_op *op)
{
gboolean res = FALSE;
gchar filebuf[PATH_MAX];
gint fd, flags;

/* Get filename */
if (!get_file_name (db, ELT_KEY (op->elt), op->elt->keylen, filebuf, sizeof (filebuf))) {
return FALSE;
}

return res;
if (op->op == FILE_OP_DELETE) {
return unlink (filebuf) != -1;
}
else {
#ifdef HAVE_O_DIRECT
flags = O_CREAT|O_TRUNC|O_WRONLY;
#else
flags = O_CREAT|O_TRUNC|O_WRONLY;
#endif
/* Open file */
if ((fd = open (filebuf, flags, S_IRUSR|S_IWUSR|S_IRGRP)) == -1) {
return FALSE;
}

#ifdef HAVE_FADVISE
posix_fadvise (fd, 0, ELT_SIZE (op->elt), POSIX_FADV_SEQUENTIAL);
#endif
if (write (fd, op->elt, ELT_SIZE (op->elt)) == -1) {
msg_info ("%d: %s", errno, strerror (errno));
close (fd);
return FALSE;
}
}

#ifdef HAVE_FDATASYNC
fdatasync (fd);
#else
fsync (fd);
#endif

close (fd);
return TRUE;
}

/* Process operations queue */
@@ -107,15 +183,16 @@ file_process_queue (struct rspamd_kv_backend *backend)
static gboolean
rspamd_recursive_mkdir (guint levels)
{
guint i, j;
guint i;
gchar nbuf[5];

/* Create directories for backend */
if (levels > 0) {
/* Create 16 directories */
for (j = 0; j < 16; j ++) {
rspamd_snprintf (nbuf, sizeof (nbuf), "./%c", hexdigits[j]);
for (i = 0; i < 16; i ++) {
rspamd_snprintf (nbuf, sizeof (nbuf), "./%c", hexdigits[i]);
if (mkdir (nbuf, 0755) != 0 && errno != EEXIST) {
msg_info ("cannot create directory %s: %s", nbuf, strerror (errno));
return FALSE;
}
else if (levels > 1) {
@@ -136,7 +213,6 @@ static void
rspamd_file_init (struct rspamd_kv_backend *backend)
{
struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend;
gint ret;
gchar pathbuf[PATH_MAX];

/* Save current directory */
@@ -153,7 +229,9 @@ rspamd_file_init (struct rspamd_kv_backend *backend)
}

/* Create directories for backend */
rspamd_recursive_mkdir (db->levels);
if (!rspamd_recursive_mkdir (db->levels)) {
goto err;
}

db->initialized = TRUE;

@@ -242,8 +320,9 @@ rspamd_file_lookup (struct rspamd_kv_backend *backend, gpointer key)
struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend;
struct file_op *op;
struct rspamd_kv_element *elt = NULL;
gint l;
gconstpointer d;
gchar filebuf[PATH_MAX];
gint fd, flags;
struct stat st;

if (!db->initialized) {
return NULL;
@@ -256,6 +335,40 @@ rspamd_file_lookup (struct rspamd_kv_backend *backend, gpointer key)
}
return op->elt;
}

/* Get filename */
if (!get_file_name (db, key, strlen (key), filebuf, sizeof (filebuf))) {
return NULL;
}

#ifdef HAVE_O_DIRECT
flags = O_RDONLY;
#else
flags = O_RDONLY;
#endif
/* Open file */
if ((fd = open (filebuf, flags)) == -1) {
return NULL;
}

if (fstat (fd, &st) == -1) {
return NULL;
}

#ifdef HAVE_FADVISE
posix_fadvise (fd, 0, st.st_size, POSIX_FADV_SEQUENTIAL);
#endif

/* Read element */
elt = g_malloc (st.st_size);
if (read (fd, elt, st.st_size) == -1) {
g_free (elt);
close (fd);
return NULL;
}

close (fd);

return elt;
}

@@ -306,6 +419,9 @@ rspamd_file_destroy (struct rspamd_kv_backend *backend)
g_queue_free (db->ops_queue);
g_hash_table_unref (db->ops_hash);
g_slice_free1 (sizeof (struct rspamd_file_backend), db);

/* Sync again */
sync ();
}
}

@@ -333,6 +449,7 @@ rspamd_kv_file_new (const gchar *filename, guint sync_ops, guint levels)

new = g_slice_alloc0 (sizeof (struct rspamd_file_backend));
new->dirname = dirname;
new->dirlen = strlen (dirname);
new->filename = g_strdup (filename);
new->sync_ops = sync_ops;
new->levels = levels;

Loading…
Cancel
Save