* Add <ref> flag for file backend that enables reference count for items in this backend.
/** Create new kv storage */
struct rspamd_kv_storage *
rspamd_kv_storage_new (gint id, const gchar *name, struct rspamd_kv_cache *cache, struct rspamd_kv_backend *backend, struct rspamd_kv_expire *expire,
- gsize max_elts, gsize max_memory)
+ gsize max_elts, gsize max_memory, gboolean no_overwrite)
{
struct rspamd_kv_storage *new;
new->id = id;
+ new->no_overwrite = no_overwrite;
+
if (name != NULL) {
new->name = g_strdup (name);
}
elt = storage->cache->insert_func (storage->cache, key, keylen, data, len);
- if (elt == NULL) {
- g_static_rw_lock_writer_unlock (&storage->rwlock);
- return FALSE;
- }
+
/* Copy data */
elt->flags = flags;
elt->expire = expire;
elt = storage->cache->lookup_func (storage->cache, key, keylen);
if (elt) {
- if (storage->expire) {
- storage->expire->delete_func (storage->expire, elt);
- }
- storage->memory -= ELT_SIZE (elt);
- storage->cache->steal_func (storage->cache, elt);
- if (elt->flags & KV_ELT_DIRTY) {
- /* Element is in backend storage queue */
- elt->flags |= KV_ELT_NEED_FREE;
+ if (!storage->no_overwrite) {
+ /* Remove old elt */
+ if (storage->expire) {
+ storage->expire->delete_func (storage->expire, elt);
+ }
+ storage->memory -= ELT_SIZE (elt);
+ storage->cache->steal_func (storage->cache, elt);
+ if (elt->flags & KV_ELT_DIRTY) {
+ /* Element is in backend storage queue */
+ elt->flags |= KV_ELT_NEED_FREE;
+ }
+ else {
+ g_slice_free1 (ELT_SIZE (elt), elt);
+ }
}
else {
- g_slice_free1 (ELT_SIZE (elt), elt);
+ /* Just do incref and nothing more */
+ if (storage->backend && storage->backend->incref_func) {
+ if (storage->backend->incref_func (storage->backend, key, keylen)) {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
+ return TRUE;
+ }
+ else {
+ g_static_rw_lock_writer_unlock (&storage->rwlock);
+ return FALSE;
+ }
+ }
}
}
/* Callbacks for cache */
typedef void (*cache_init)(struct rspamd_kv_cache *cache);
-typedef struct rspamd_kv_element* (*cache_insert)(struct rspamd_kv_cache *cache, gpointer key, guint keylen, gpointer value, gsize len);
-typedef gboolean (*cache_replace)(struct rspamd_kv_cache *cache, gpointer key, guint keylen, struct rspamd_kv_element *elt);
+typedef struct rspamd_kv_element* (*cache_insert)(struct rspamd_kv_cache *cache,
+ gpointer key, guint keylen, gpointer value, gsize len);
+typedef gboolean (*cache_replace)(struct rspamd_kv_cache *cache, gpointer key, guint keylen,
+ struct rspamd_kv_element *elt);
typedef struct rspamd_kv_element* (*cache_lookup)(struct rspamd_kv_cache *cache, gpointer key, guint keylen);
typedef struct rspamd_kv_element* (*cache_delete)(struct rspamd_kv_cache *cache, gpointer key, guint keylen);
typedef void (*cache_steal)(struct rspamd_kv_cache *cache, struct rspamd_kv_element* elt);
/* Callbacks for backend */
typedef void (*backend_init)(struct rspamd_kv_backend *backend);
-typedef gboolean (*backend_insert)(struct rspamd_kv_backend *backend, gpointer key, guint keylen, struct rspamd_kv_element *elt);
-typedef gboolean (*backend_replace)(struct rspamd_kv_backend *backend, gpointer key, guint keylen, struct rspamd_kv_element *elt);
-typedef struct rspamd_kv_element* (*backend_lookup)(struct rspamd_kv_backend *backend, gpointer key, guint keylen);
+typedef gboolean (*backend_insert)(struct rspamd_kv_backend *backend, gpointer key, guint keylen,
+ struct rspamd_kv_element *elt);
+typedef gboolean (*backend_replace)(struct rspamd_kv_backend *backend, gpointer key, guint keylen,
+ struct rspamd_kv_element *elt);
+typedef struct rspamd_kv_element* (*backend_lookup)(struct rspamd_kv_backend *backend, gpointer key,
+ guint keylen);
typedef void (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key, guint keylen);
typedef gboolean (*backend_sync)(struct rspamd_kv_backend *backend);
+typedef gboolean (*backend_incref)(struct rspamd_kv_backend *backend, gpointer key, guint keylen);
typedef void (*backend_destroy)(struct rspamd_kv_backend *backend);
/* Callbacks for expire */
typedef void (*expire_init)(struct rspamd_kv_expire *expire);
typedef void (*expire_insert)(struct rspamd_kv_expire *expire, struct rspamd_kv_element *elt);
typedef void (*expire_delete)(struct rspamd_kv_expire *expire, struct rspamd_kv_element *elt);
-typedef gboolean (*expire_step)(struct rspamd_kv_expire *expire, struct rspamd_kv_storage *storage, time_t now, gboolean forced);
+typedef gboolean (*expire_step)(struct rspamd_kv_expire *expire, struct rspamd_kv_storage *storage,
+ time_t now, gboolean forced);
typedef void (*expire_destroy)(struct rspamd_kv_expire *expire);
backend_lookup lookup_func; /*< this callback is used for lookup of element */
backend_delete delete_func; /*< this callback is called when an element is deleted */
backend_sync sync_func; /*< this callback is called when backend need to be synced */
+ backend_incref incref_func; /*< this callback is called when element must be ref'd */
backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */
};
struct rspamd_kv_expire {
gint id; /* char ID */
gchar *name; /* numeric ID */
+
+ gboolean no_overwrite; /* do not overwrite data with the same keys */
GStaticRWLock rwlock; /* rwlock for threaded access */
};
struct rspamd_kv_storage *rspamd_kv_storage_new (gint id, const gchar *name,
struct rspamd_kv_cache *cache, struct rspamd_kv_backend *backend,
struct rspamd_kv_expire *expire,
- gsize max_elts, gsize max_memory);
+ gsize max_elts, gsize max_memory, gboolean no_overwrite);
/** Insert new element to the kv storage */
gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, guint keylen, gpointer data, gsize len, gint flags, guint expire);
KVSTORAGE_STATE_CACHE_TYPE,
KVSTORAGE_STATE_CACHE_MAX_ELTS,
KVSTORAGE_STATE_CACHE_MAX_MEM,
+ KVSTORAGE_STATE_CACHE_NO_OVERWRITE,
KVSTORAGE_STATE_BACKEND_TYPE,
KVSTORAGE_STATE_BACKEND_FILENAME,
KVSTORAGE_STATE_BACKEND_SYNC_OPS,
KVSTORAGE_STATE_BACKEND_DO_FSYNC,
+ KVSTORAGE_STATE_BACKEND_DO_REF,
KVSTORAGE_STATE_EXPIRE_TYPE,
KVSTORAGE_STATE_ERROR
} state;
break;
case KVSTORAGE_TYPE_BACKEND_FILE:
backend = rspamd_kv_file_new (kconf->backend.filename, kconf->backend.sync_ops,
- FILE_STORAGE_LEVELS, kconf->backend.do_fsync);
+ FILE_STORAGE_LEVELS, kconf->backend.do_fsync, kconf->backend.do_ref);
break;
#ifdef WITH_DB
case KVSTORAGE_TYPE_BACKEND_BDB:
}
kconf->storage = rspamd_kv_storage_new (kconf->id, kconf->name, cache, backend, expire,
- kconf->cache.max_elements, kconf->cache.max_memory);
+ kconf->cache.max_elements, kconf->cache.max_memory, kconf->cache.no_overwrite);
}
/* XML parse callbacks */
kv_parser->state = KVSTORAGE_STATE_CACHE_MAX_MEM;
kv_parser->cur_elt = "max_memory";
}
+ else if (g_ascii_strcasecmp (element_name, "no_overwrite") == 0) {
+ kv_parser->state = KVSTORAGE_STATE_CACHE_NO_OVERWRITE;
+ kv_parser->cur_elt = "no_overwrite";
+ }
else if (g_ascii_strcasecmp (element_name, "id") == 0) {
kv_parser->state = KVSTORAGE_STATE_ID;
kv_parser->cur_elt = "id";
kv_parser->state = KVSTORAGE_STATE_BACKEND_DO_FSYNC;
kv_parser->cur_elt = "fsync";
}
+ else if (g_ascii_strcasecmp (element_name, "ref") == 0) {
+ kv_parser->state = KVSTORAGE_STATE_BACKEND_DO_REF;
+ kv_parser->cur_elt = "ref";
+ }
else {
if (*error == NULL) {
*error = g_error_new (xml_error_quark (), XML_EXTRA_ELEMENT, "element %s is unexpected in backend definition",
case KVSTORAGE_STATE_CACHE_TYPE:
case KVSTORAGE_STATE_CACHE_MAX_ELTS:
case KVSTORAGE_STATE_CACHE_MAX_MEM:
+ case KVSTORAGE_STATE_CACHE_NO_OVERWRITE:
CHECK_TAG (KVSTORAGE_STATE_PARAM);
break;
case KVSTORAGE_STATE_BACKEND_TYPE:
case KVSTORAGE_STATE_BACKEND_FILENAME:
case KVSTORAGE_STATE_BACKEND_SYNC_OPS:
case KVSTORAGE_STATE_BACKEND_DO_FSYNC:
+ case KVSTORAGE_STATE_BACKEND_DO_REF:
CHECK_TAG (KVSTORAGE_STATE_BACKEND);
break;
case KVSTORAGE_STATE_EXPIRE_TYPE:
case KVSTORAGE_STATE_CACHE_MAX_MEM:
kv_parser->current_storage->cache.max_memory = parse_limit (text, text_len);
break;
+ case KVSTORAGE_STATE_CACHE_NO_OVERWRITE:
+ kv_parser->current_storage->cache.no_overwrite = parse_flag (text);
+ break;
case KVSTORAGE_STATE_CACHE_TYPE:
if (g_ascii_strncasecmp (text, "hash", MIN (text_len, sizeof ("hash") - 1)) == 0) {
kv_parser->current_storage->cache.type = KVSTORAGE_TYPE_CACHE_HASH;
case KVSTORAGE_STATE_BACKEND_DO_FSYNC:
kv_parser->current_storage->backend.do_fsync = parse_flag (text);
break;
+ case KVSTORAGE_STATE_BACKEND_DO_REF:
+ kv_parser->current_storage->backend.do_ref = parse_flag (text);
+ break;
case KVSTORAGE_STATE_EXPIRE_TYPE:
if (g_ascii_strncasecmp (text, "lru", MIN (text_len, sizeof ("lru") - 1)) == 0) {
kv_parser->current_storage->expire.type = KVSTORAGE_TYPE_EXPIRE_LRU;
struct kvstorage_cache_config {
gsize max_elements;
gsize max_memory;
+ gboolean no_overwrite;
enum kvstorage_cache_type type;
};
gchar *filename;
guint sync_ops;
gboolean do_fsync;
+ gboolean do_ref;
};
FILE_OP_DELETE,
FILE_OP_REPLACE
} op;
+ guint32 ref;
};
/* Main file structure */
backend_lookup lookup_func; /*< this callback is used for lookup of element */
backend_delete delete_func; /*< this callback is called when an element is deleted */
backend_sync sync_func; /*< this callback is called when backend need to be synced */
+ backend_incref incref_func; /*< this callback is called when element must be ref'd */
backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */
gchar *filename;
gchar *dirname;
GQueue *ops_queue;
GHashTable *ops_hash;
gboolean do_fsync;
+ gboolean do_ref;
gboolean initialized;
};
return TRUE;
}
+/* Read reference from specified file */
+static guint32
+file_get_ref (gint fd)
+{
+ guint32 target;
+
+ if (read (fd, &target, sizeof (guint32)) != sizeof (guint32)) {
+ return 0;
+ }
+
+ return target;
+}
+
+/* Set reference to specified file */
+static gboolean
+file_set_ref (gint fd, guint32 ref)
+{
+ if (write (fd, &ref, sizeof (guint32)) != sizeof (guint32)) {
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+/*
+ * Open file, set posix_fadvise and all necessary flags
+ */
+static gint
+file_open_fd (const gchar *path, gsize *len, gint flags)
+{
+ gint fd;
+ struct stat st;
+
+ if ((flags & O_CREAT) != 0) {
+ /* Open file */
+ if ((fd = open (path, flags, S_IRUSR|S_IWUSR|S_IRGRP)) != -1) {
+#ifdef HAVE_FADVISE
+ posix_fadvise (fd, 0, *len, POSIX_FADV_SEQUENTIAL);
+#endif
+ }
+ }
+ else {
+ /* Open file */
+ if ((fd = open (path, flags)) == -1) {
+ return -1;
+ }
+
+ if (fstat (fd, &st) == -1) {
+ close (fd);
+ return -1;
+ }
+
+#ifdef HAVE_FADVISE
+ posix_fadvise (fd, 0, st.st_size, POSIX_FADV_SEQUENTIAL);
+#endif
+ *len = st.st_size;
+ }
+
+ return fd;
+}
+
/* Process single file operation */
static gboolean
file_process_single_op (struct rspamd_file_backend *db, struct file_op *op, gint *pfd)
{
gchar filebuf[PATH_MAX];
- gint fd, flags;
+ gint fd;
+ gsize len;
+ struct iovec iov[2];
+ guint32 ref;
/* Get filename */
if (!get_file_name (db, ELT_KEY (op->elt), op->elt->keylen, filebuf, sizeof (filebuf))) {
return FALSE;
}
+ if (db->do_ref) {
+ len = ELT_SIZE (op->elt) + sizeof (guint32);
+ }
+ else {
+ len = ELT_SIZE (op->elt);
+ }
+
if (op->op == FILE_OP_DELETE) {
- *pfd = -1;
- return unlink (filebuf) != -1;
+ if (db->do_ref) {
+ if ((fd = file_open_fd (filebuf, &len, O_RDWR)) == -1) {
+ *pfd = -1;
+ return FALSE;
+ }
+ if ((ref = file_get_ref (fd)) <= 1) {
+ /* Refcount is not enough, remove file */
+ close (fd);
+ *pfd = -1;
+ return unlink (filebuf) != -1;
+ }
+ else {
+ /* Decrease ref */
+ lseek (fd, 0, SEEK_SET);
+ if (! file_set_ref (fd, --ref)) {
+ *pfd = fd;
+ return FALSE;
+ }
+ }
+ }
+ else {
+ *pfd = -1;
+ return unlink (filebuf) != -1;
+ }
}
else {
-#ifdef HAVE_O_DIRECT
- flags = O_CREAT|O_TRUNC|O_WRONLY;
-#else
- flags = O_CREAT|O_TRUNC|O_WRONLY;
-#endif
- /* Open file */
- if ((fd = open (filebuf, flags, S_IRUSR|S_IWUSR|S_IRGRP)) == -1) {
+ if ((fd = file_open_fd (filebuf, &len, O_CREAT|O_WRONLY|O_TRUNC)) == -1) {
*pfd = -1;
return FALSE;
}
-
-#ifdef HAVE_FADVISE
- posix_fadvise (fd, 0, ELT_SIZE (op->elt), POSIX_FADV_SEQUENTIAL);
-#endif
- if (write (fd, op->elt, ELT_SIZE (op->elt)) == -1) {
- msg_info ("%d: %s", errno, strerror (errno));
- *pfd = fd;
- return FALSE;
+ if (db->do_ref) {
+ iov[0].iov_base = &op->ref;
+ iov[0].iov_len = sizeof (guint32);
+ iov[1].iov_base = op->elt;
+ iov[1].iov_len = ELT_SIZE (op->elt);
+ if (writev (fd, iov, G_N_ELEMENTS (iov)) == -1) {
+ msg_info ("%d: %s", errno, strerror (errno));
+ *pfd = fd;
+ return FALSE;
+ }
+ }
+ else {
+ if (write (fd, op->elt, ELT_SIZE (op->elt)) == -1) {
+ msg_info ("%d: %s", errno, strerror (errno));
+ *pfd = fd;
+ return FALSE;
+ }
}
}
static void
file_sync_fds (gint *fds, gint len, gboolean do_fsync)
{
- gint i, fd;
+ gint i, fd;
for (i = 0; i < len; i ++) {
fd = fds[i];
g_slice_free1 (ELT_SIZE (op->elt), op->elt);
}
op->op = FILE_OP_INSERT;
+ op->ref ++;
op->elt = elt;
elt->flags |= KV_ELT_DIRTY;
g_hash_table_insert (db->ops_hash, elt, op);
op = g_slice_alloc (sizeof (struct file_op));
op->op = FILE_OP_INSERT;
op->elt = elt;
+ op->ref = 1;
elt->flags |= KV_ELT_DIRTY;
g_queue_push_head (db->ops_queue, op);
op = g_slice_alloc (sizeof (struct file_op));
op->op = FILE_OP_REPLACE;
op->elt = elt;
+ op->ref = 1;
elt->flags |= KV_ELT_DIRTY;
g_queue_push_head (db->ops_queue, op);
struct file_op *op;
struct rspamd_kv_element *elt = NULL;
gchar filebuf[PATH_MAX];
- gint fd, flags;
- struct stat st;
+ gint fd;
struct rspamd_kv_element search_elt;
+ gsize len;
search_elt.keylen = keylen;
search_elt.p = key;
return NULL;
}
-#ifdef HAVE_O_DIRECT
- flags = O_RDONLY;
-#else
- flags = O_RDONLY;
-#endif
- /* Open file */
- if ((fd = open (filebuf, flags)) == -1) {
- return NULL;
- }
-
- if (fstat (fd, &st) == -1) {
+ if ((fd = file_open_fd (filebuf, &len, O_RDONLY)) == -1) {
return NULL;
}
-#ifdef HAVE_FADVISE
- posix_fadvise (fd, 0, st.st_size, POSIX_FADV_SEQUENTIAL);
-#endif
-
/* Read element */
- elt = g_malloc (st.st_size);
- if (read (fd, elt, st.st_size) == -1) {
+ if (db->do_ref) {
+ lseek (fd, sizeof (guint32), SEEK_CUR);
+ len -= sizeof (guint32);
+ }
+ elt = g_malloc (len);
+ if (read (fd, elt, len) == -1) {
g_free (elt);
close (fd);
return NULL;
gchar filebuf[PATH_MAX];
struct rspamd_kv_element search_elt;
struct file_op *op;
+ gsize len;
+ gint fd;
+ guint32 ref;
if (!db->initialized) {
return;
/* First search in ops queue */
if ((op = g_hash_table_lookup (db->ops_hash, &search_elt)) != NULL) {
op->op = FILE_OP_DELETE;
+ if (op->ref > 0) {
+ op->ref --;
+ }
return;
}
/* Get filename */
return;
}
+ if (db->do_ref) {
+ if ((fd = file_open_fd (filebuf, &len, O_RDWR)) == -1) {
+ return;
+ }
+ if ((ref = file_get_ref (fd)) <= 1) {
+ /* Refcount is not enough, remove file */
+ close (fd);
+ unlink (filebuf);
+ }
+ else {
+ /* Decrease ref */
+ lseek (fd, 0, SEEK_SET);
+ file_set_ref (fd, --ref);
+ }
+ return;
+ }
+
unlink (filebuf);
}
+static gboolean
+rspamd_file_incref (struct rspamd_kv_backend *backend, gpointer key, guint keylen)
+{
+ struct rspamd_file_backend *db = (struct rspamd_file_backend *)backend;
+ gchar filebuf[PATH_MAX];
+ struct rspamd_kv_element search_elt;
+ struct file_op *op;
+ gsize len;
+ gint fd;
+ guint32 ref;
+
+ if (!db->initialized) {
+ return FALSE;
+ }
+ if (!db->do_ref) {
+ return TRUE;
+ }
+
+ search_elt.keylen = keylen;
+ search_elt.p = key;
+ /* First search in ops queue */
+ if ((op = g_hash_table_lookup (db->ops_hash, &search_elt)) != NULL) {
+ op->ref ++;
+ if (op->op == FILE_OP_DELETE) {
+ op->op = FILE_OP_INSERT;
+ }
+ return TRUE;
+ }
+
+ /* Get filename */
+ if (!get_file_name (db, key, keylen, filebuf, sizeof (filebuf))) {
+ return FALSE;
+ }
+
+ if ((fd = file_open_fd (filebuf, &len, O_RDWR)) == -1) {
+ return FALSE;
+ }
+
+ ref = file_get_ref (fd);
+
+ /* Decrease ref */
+ lseek (fd, 0, SEEK_SET);
+
+ if (file_set_ref (fd, ++ref)) {
+ close (fd);
+ return TRUE;
+ }
+ else {
+ close (fd);
+ return FALSE;
+ }
+}
+
static void
rspamd_file_destroy (struct rspamd_kv_backend *backend)
{
/* Create new file backend */
struct rspamd_kv_backend *
-rspamd_kv_file_new (const gchar *filename, guint sync_ops, guint levels, gboolean do_fsync)
+rspamd_kv_file_new (const gchar *filename, guint sync_ops, guint levels, gboolean do_fsync, gboolean do_ref)
{
struct rspamd_file_backend *new;
struct stat st;
new->sync_ops = sync_ops;
new->levels = levels;
new->do_fsync = do_fsync;
+ new->do_ref = do_ref;
new->ops_queue = g_queue_new ();
new->ops_hash = g_hash_table_new (kv_elt_hash_func, kv_elt_compare_func);
new->delete_func = rspamd_file_delete;
new->replace_func = rspamd_file_replace;
new->sync_func = file_process_queue;
+ new->incref_func = rspamd_file_incref;
new->destroy_func = rspamd_file_destroy;
return (struct rspamd_kv_backend *)new;
#include "kvstorage.h"
/* Create new file backend */
-struct rspamd_kv_backend* rspamd_kv_file_new (const gchar *filename, guint sync_ops, guint levels, gboolean do_fsync);
+struct rspamd_kv_backend* rspamd_kv_file_new (const gchar *filename, guint sync_ops,
+ guint levels, gboolean do_fsync, gboolean do_ref);
#endif /* KVSTORAGE_FILE_H_ */