/* Copyright (c) 2010, Vsevolod Stakhov * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "config.h" #include "kvstorage.h" #include "kvstorage_file.h" #include "util.h" #include "main.h" struct file_op { struct rspamd_kv_element *elt; enum { FILE_OP_INSERT, FILE_OP_DELETE, FILE_OP_REPLACE } op; guint32 ref; }; /* Main file structure */ struct rspamd_file_backend { backend_init init_func; /*< this callback is called on kv storage initialization */ backend_insert insert_func; /*< this callback is called when element is inserted */ backend_replace replace_func; /*< this callback is called when element is replaced */ backend_lookup lookup_func; /*< this callback is used for lookup of element */ backend_delete delete_func; /*< this callback is called when an element is deleted */ backend_sync sync_func; /*< this callback is called when backend need to be synced */ backend_incref incref_func; /*< this callback is called when element must be ref'd */ backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */ gchar *filename; gchar *dirname; guint dirlen; guint sync_ops; guint levels; GQueue *ops_queue; GHashTable *ops_hash; gboolean do_fsync; gboolean do_ref; gboolean initialized; }; static const gchar hexdigits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; /* Generate file name for operation */ static gboolean get_file_name (struct rspamd_file_backend *db, gchar *key, guint keylen, gchar *filebuf, guint buflen) { gchar *p = filebuf, *end = filebuf + buflen, *k = key, t; guint i; /* First copy backend dirname to file buf */ if (buflen <= db->dirlen) { return FALSE; } memcpy (p, db->dirname, db->dirlen); p += db->dirlen; *p++ = G_DIR_SEPARATOR; for (i = 0; i < MIN (keylen, db->levels); i ++) { if (p == end) { /* Filebuf is not large enough */ return FALSE; } t = *k; *p++ = hexdigits[(t & 0xf) ^ ((t & 0xf0) >> 4)]; *p++ = G_DIR_SEPARATOR; k ++; } /* Now we have directory, append base64 encoded filename */ k = key; if (end - p < keylen * 2 + 1) { /* Filebuf is not large enough */ return FALSE; } i = 0; while (k < key + keylen) { t = *k; *p++ = hexdigits[(t >> 4) & 0xf]; *p++ = hexdigits[t & 0xf]; k ++; } *p = '\0'; return TRUE; } /* Read reference from specified file */ static guint32 file_get_ref (gint fd) { guint32 target; if (read (fd, &target, sizeof (guint32)) != sizeof (guint32)) { return 0; } return target; } /* Set reference to specified file */ static gboolean file_set_ref (gint fd, guint32 ref) { if (write (fd, &ref, sizeof (guint32)) != sizeof (guint32)) { return FALSE; } return TRUE; } /* * Open file, set posix_fadvise and all necessary flags */ static gint file_open_fd (const gchar *path, gsize *len, gint flags) { gint fd; struct stat st; if ((flags & O_CREAT) != 0) { /* Open file */ if ((fd = open (path, flags, S_IRUSR|S_IWUSR|S_IRGRP)) != -1) { rspamd_fallocate (fd, 0, *len); #ifdef HAVE_FADVISE posix_fadvise (fd, 0, *len, POSIX_FADV_SEQUENTIAL); #endif } } else { /* Open file */ if ((fd = open (path, flags)) == -1) { return -1; } if (fstat (fd, &st) == -1) { close (fd); return -1; } #ifdef HAVE_FADVISE posix_fadvise (fd, 0, st.st_size, POSIX_FADV_SEQUENTIAL); #endif *len = st.st_size; } return fd; } /* Process single file operation */ static gboolean file_process_single_op (struct rspamd_file_backend *db, struct file_op *op, gint *pfd) { gchar filebuf[PATH_MAX]; gint fd; gsize len; struct iovec iov[2]; guint32 ref; /* Get filename */ if (!get_file_name (db, ELT_KEY (op->elt), op->elt->keylen, filebuf, sizeof (filebuf))) { return FALSE; } if (db->do_ref) { len = ELT_SIZE (op->elt) + sizeof (guint32); } else { len = ELT_SIZE (op->elt); } if (op->op == FILE_OP_DELETE) { if (db->do_ref) { if ((fd = file_open_fd (filebuf, &len, O_RDWR)) == -1) { *pfd = -1; return FALSE; } if ((ref = file_get_ref (fd)) <= 1) { /* Refcount is not enough, remove file */ close (fd); *pfd = -1; return unlink (filebuf) != -1; } else { /* Decrease ref */ lseek (fd, 0, SEEK_SET); if (! file_set_ref (fd, --ref)) { *pfd = fd; return FALSE; } } } else { *pfd = -1; return unlink (filebuf) != -1; } } else { if ((fd = file_open_fd (filebuf, &len, O_CREAT|O_WRONLY|O_TRUNC)) == -1) { *pfd = -1; return FALSE; } if (db->do_ref) { iov[0].iov_base = &op->ref; iov[0].iov_len = sizeof (guint32); iov[1].iov_base = op->elt; iov[1].iov_len = ELT_SIZE (op->elt); if (writev (fd, iov, G_N_ELEMENTS (iov)) == -1) { msg_info ("%d: %s", errno, strerror (errno)); *pfd = fd; return FALSE; } } else { if (write (fd, op->elt, ELT_SIZE (op->elt)) == -1) { msg_info ("%d: %s", errno, strerror (errno)); *pfd = fd; return FALSE; } } } *pfd = fd; return TRUE; } /* Sync vector of descriptors */ static void file_sync_fds (gint *fds, gint len, gboolean do_fsync) { gint i, fd; for (i = 0; i < len; i ++) { fd = fds[i]; if (fd != -1) { if (do_fsync) { #ifdef HAVE_FDATASYNC fdatasync (fd); #else fsync (fd); #endif } close (fd); } } } /* Process operations queue */ static gboolean file_process_queue (struct rspamd_kv_backend *backend) { struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend; struct file_op *op; GList *cur; gint *fds, i = 0, len; len = g_queue_get_length (db->ops_queue); if (len == 0) { /* Nothing to process */ return TRUE; } fds = g_slice_alloc (len * sizeof (gint)); cur = db->ops_queue->head; while (cur) { op = cur->data; if (! file_process_single_op (db, op, &fds[i])) { file_sync_fds (fds, i, db->do_fsync); g_slice_free1 (len * sizeof (gint), fds); return FALSE; } i ++; cur = g_list_next (cur); } file_sync_fds (fds, i, db->do_fsync); g_slice_free1 (len * sizeof (gint), fds); /* Clean the queue */ g_hash_table_remove_all (db->ops_hash); cur = db->ops_queue->head; while (cur) { op = cur->data; if (op->op == FILE_OP_DELETE || ((op->elt->flags & KV_ELT_NEED_FREE) != 0 && (op->elt->flags & KV_ELT_NEED_INSERT) == 0)) { /* Also clean memory */ g_slice_free1 (ELT_SIZE (op->elt), op->elt); } else { /* Unset dirty flag */ op->elt->flags &= ~KV_ELT_DIRTY; } g_slice_free1 (sizeof (struct file_op), op); cur = g_list_next (cur); } g_queue_clear (db->ops_queue); return TRUE; } /* Make 16 directories for each level */ static gboolean rspamd_recursive_mkdir (guint levels) { guint i; gchar nbuf[5]; /* Create directories for backend */ if (levels > 0) { /* Create 16 directories */ for (i = 0; i < 16; i ++) { rspamd_snprintf (nbuf, sizeof (nbuf), "./%c", hexdigits[i]); if (mkdir (nbuf, 0755) != 0 && errno != EEXIST) { msg_info ("cannot create directory %s: %s", nbuf, strerror (errno)); return FALSE; } else if (levels > 1) { if (chdir (nbuf) == -1) { msg_err ("chdir to %s failed: %s", nbuf, strerror (errno)); return FALSE; } if (! rspamd_recursive_mkdir (levels - 1)) { return FALSE; } if (chdir ("../") == -1) { msg_err ("chdir to ../ failed: %s", strerror (errno)); return FALSE; } } } } return TRUE; } /* Backend callbacks */ static void rspamd_file_init (struct rspamd_kv_backend *backend) { struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend; gchar pathbuf[PATH_MAX]; /* Save current directory */ if (getcwd (pathbuf, sizeof (pathbuf) - 1) == NULL) { pathbuf[0] = '\0'; msg_err ("getcwd failed: %s", strerror (errno)); goto err; } /* Chdir to the working dir */ if (chdir (db->dirname) == -1) { msg_err ("chdir failed: %s", strerror (errno)); goto err; } /* Create directories for backend */ if (!rspamd_recursive_mkdir (db->levels)) { goto err; } db->initialized = TRUE; if (chdir (pathbuf) == -1) { msg_err ("chdir to %s failed: %s", pathbuf, strerror (errno)); } return; err: if (pathbuf[0] != '\0') { if (chdir (pathbuf) == -1) { msg_err ("chdir to %s failed: %s", pathbuf, strerror (errno)); } } } static gboolean rspamd_file_insert (struct rspamd_kv_backend *backend, gpointer key, guint keylen, struct rspamd_kv_element *elt) { struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend; struct file_op *op; struct rspamd_kv_element search_elt; search_elt.keylen = keylen; search_elt.p = key; if (!db->initialized) { return FALSE; } if ((op = g_hash_table_lookup (db->ops_hash, &search_elt)) != NULL) { /* We found another op with such key in this queue */ if (op->op == FILE_OP_DELETE || (op->elt->flags & KV_ELT_NEED_FREE) != 0) { /* Also clean memory */ g_hash_table_steal (db->ops_hash, &search_elt); g_slice_free1 (ELT_SIZE (op->elt), op->elt); } op->op = FILE_OP_INSERT; op->ref ++; op->elt = elt; elt->flags |= KV_ELT_DIRTY; g_hash_table_insert (db->ops_hash, elt, op); } else { op = g_slice_alloc (sizeof (struct file_op)); op->op = FILE_OP_INSERT; op->elt = elt; op->ref = 1; elt->flags |= KV_ELT_DIRTY; g_queue_push_head (db->ops_queue, op); g_hash_table_insert (db->ops_hash, elt, op); } if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) { return file_process_queue (backend); } return TRUE; } static gboolean rspamd_file_replace (struct rspamd_kv_backend *backend, gpointer key, guint keylen, struct rspamd_kv_element *elt) { struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend; struct file_op *op; struct rspamd_kv_element search_elt; search_elt.keylen = keylen; search_elt.p = key; if (!db->initialized) { return FALSE; } if ((op = g_hash_table_lookup (db->ops_hash, &search_elt)) != NULL) { /* We found another op with such key in this queue */ if (op->op == FILE_OP_DELETE || (op->elt->flags & KV_ELT_NEED_FREE) != 0) { /* Also clean memory */ g_hash_table_steal (db->ops_hash, &search_elt); g_slice_free1 (ELT_SIZE (op->elt), op->elt); } op->op = FILE_OP_REPLACE; op->elt = elt; elt->flags |= KV_ELT_DIRTY; g_hash_table_insert (db->ops_hash, elt, op); } else { op = g_slice_alloc (sizeof (struct file_op)); op->op = FILE_OP_REPLACE; op->elt = elt; op->ref = 1; elt->flags |= KV_ELT_DIRTY; g_queue_push_head (db->ops_queue, op); g_hash_table_insert (db->ops_hash, elt, op); } if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) { return file_process_queue (backend); } return TRUE; } static struct rspamd_kv_element* rspamd_file_lookup (struct rspamd_kv_backend *backend, gpointer key, guint keylen) { struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend; struct file_op *op; struct rspamd_kv_element *elt = NULL; gchar filebuf[PATH_MAX]; gint fd; struct rspamd_kv_element search_elt; gsize len; search_elt.keylen = keylen; search_elt.p = key; if (!db->initialized) { return NULL; } /* First search in ops queue */ if ((op = g_hash_table_lookup (db->ops_hash, &search_elt)) != NULL) { if (op->op == FILE_OP_DELETE) { /* To delete, so assume it as not found */ return NULL; } return op->elt; } /* Get filename */ if (!get_file_name (db, key, keylen, filebuf, sizeof (filebuf))) { return NULL; } if ((fd = file_open_fd (filebuf, &len, O_RDONLY)) == -1) { return NULL; } /* Read element */ if (db->do_ref) { lseek (fd, sizeof (guint32), SEEK_CUR); len -= sizeof (guint32); } elt = g_malloc (len); if (read (fd, elt, len) == -1) { g_free (elt); close (fd); return NULL; } close (fd); elt->flags &= ~(KV_ELT_DIRTY|KV_ELT_NEED_FREE); return elt; } static void rspamd_file_delete (struct rspamd_kv_backend *backend, gpointer key, guint keylen) { struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend; gchar filebuf[PATH_MAX]; struct rspamd_kv_element search_elt; struct file_op *op; gsize len; gint fd; guint32 ref; if (!db->initialized) { return; } search_elt.keylen = keylen; search_elt.p = key; /* First search in ops queue */ if ((op = g_hash_table_lookup (db->ops_hash, &search_elt)) != NULL) { op->op = FILE_OP_DELETE; if (op->ref > 0) { op->ref --; } return; } /* Get filename */ if (!get_file_name (db, key, keylen, filebuf, sizeof (filebuf))) { return; } if (db->do_ref) { if ((fd = file_open_fd (filebuf, &len, O_RDWR)) == -1) { return; } if ((ref = file_get_ref (fd)) <= 1) { /* Refcount is not enough, remove file */ close (fd); unlink (filebuf); } else { /* Decrease ref */ lseek (fd, 0, SEEK_SET); file_set_ref (fd, --ref); } return; } unlink (filebuf); } static gboolean rspamd_file_incref (struct rspamd_kv_backend *backend, gpointer key, guint keylen) { struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend; gchar filebuf[PATH_MAX]; struct rspamd_kv_element search_elt; struct file_op *op; gsize len; gint fd; guint32 ref; if (!db->initialized) { return FALSE; } if (!db->do_ref) { return TRUE; } search_elt.keylen = keylen; search_elt.p = key; /* First search in ops queue */ if ((op = g_hash_table_lookup (db->ops_hash, &search_elt)) != NULL) { op->ref ++; if (op->op == FILE_OP_DELETE) { op->op = FILE_OP_INSERT; } return TRUE; } /* Get filename */ if (!get_file_name (db, key, keylen, filebuf, sizeof (filebuf))) { return FALSE; } if ((fd = file_open_fd (filebuf, &len, O_RDWR)) == -1) { return FALSE; } ref = file_get_ref (fd); /* Decrease ref */ lseek (fd, 0, SEEK_SET); if (file_set_ref (fd, ++ref)) { close (fd); return TRUE; } else { close (fd); return FALSE; } } static void rspamd_file_destroy (struct rspamd_kv_backend *backend) { struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend; if (db->initialized) { file_process_queue (backend); g_free (db->filename); g_free (db->dirname); g_queue_free (db->ops_queue); g_hash_table_unref (db->ops_hash); g_slice_free1 (sizeof (struct rspamd_file_backend), db); /* Sync again */ sync (); } } /* Create new file backend */ struct rspamd_kv_backend * rspamd_kv_file_new (const gchar *filename, guint sync_ops, guint levels, gboolean do_fsync, gboolean do_ref) { struct rspamd_file_backend *new; struct stat st; gchar *dirname; if (filename == NULL) { return NULL; } dirname = g_path_get_dirname (filename); if (dirname == NULL || stat (dirname, &st) == -1 || !S_ISDIR (st.st_mode)) { /* Inaccessible path */ if (dirname != NULL) { g_free (dirname); } msg_err ("invalid file: %s", filename); return NULL; } new = g_slice_alloc0 (sizeof (struct rspamd_file_backend)); new->dirname = dirname; new->dirlen = strlen (dirname); new->filename = g_strdup (filename); new->sync_ops = sync_ops; new->levels = levels; new->do_fsync = do_fsync; new->do_ref = do_ref; new->ops_queue = g_queue_new (); new->ops_hash = g_hash_table_new (kv_elt_hash_func, kv_elt_compare_func); /* Init callbacks */ new->init_func = rspamd_file_init; new->insert_func = rspamd_file_insert; new->lookup_func = rspamd_file_lookup; new->delete_func = rspamd_file_delete; new->replace_func = rspamd_file_replace; new->sync_func = file_process_queue; new->incref_func = rspamd_file_incref; new->destroy_func = rspamd_file_destroy; return (struct rspamd_kv_backend *)new; }