aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/kvstorage.c496
-rw-r--r--src/kvstorage.h31
2 files changed, 510 insertions, 17 deletions
diff --git a/src/kvstorage.c b/src/kvstorage.c
index 1371df9f5..2cfad9459 100644
--- a/src/kvstorage.c
+++ b/src/kvstorage.c
@@ -24,6 +24,7 @@
#include "config.h"
#include "kvstorage.h"
#include "main.h"
+#include "radix.h"
#define MAX_EXPIRE_STEPS 10
@@ -56,9 +57,63 @@ rspamd_kv_storage_new (gint id, const gchar *name, struct rspamd_kv_cache *cache
rspamd_snprintf (new->name, sizeof ("18446744073709551616"), "%d", id);
}
+ /* Init structures */
+ if (new->cache->init_func) {
+ new->cache->init_func (new->cache);
+ }
+ if (new->backend && new->backend->init_func) {
+ new->backend->init_func (new->backend);
+ }
+ if (new->expire && new->expire->init_func) {
+ new->expire->init_func (new->expire);
+ }
+
return new;
}
+/** Internal insertion to the kv storage from backend */
+gboolean
+rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, struct rspamd_kv_element *elt)
+{
+ gint steps = 0;
+
+ /* Hard limit */
+ if (elt->size > storage->max_memory) {
+ msg_info ("<%s>: trying to insert value of length %z while limit is %z", elt->size, storage->max_memory);
+ return FALSE;
+ }
+
+ /* Now check limits */
+ while (storage->memory + elt->size > storage->max_memory || storage->elts >= storage->max_elts) {
+ if (storage->expire) {
+ storage->expire->step_func (storage->expire, storage, time (NULL));
+ }
+ else {
+ msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name);
+ }
+ if (++steps > MAX_EXPIRE_STEPS) {
+ msg_warn ("<%s>: cannot expire enough keys in storage", storage->name);
+ return FALSE;
+ }
+ }
+
+ /* Insert elt to the cache */
+ elt = storage->cache->insert_func (storage->cache, elt->key, elt->data, elt->size);
+ if (elt == NULL) {
+ return FALSE;
+ }
+
+ /* Insert to the expire */
+ if (storage->expire) {
+ storage->expire->insert_func (storage->expire, elt);
+ }
+
+ storage->elts ++;
+ storage->memory += elt->size + sizeof (struct rspamd_kv_element);
+
+ return TRUE;
+}
+
/** Insert new element to the kv storage */
gboolean
rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags)
@@ -76,7 +131,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpoin
/* Now check limits */
while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) {
if (storage->expire) {
- storage->expire->step_func (storage->expire, storage);
+ storage->expire->step_func (storage->expire, storage, time (NULL));
}
else {
msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name);
@@ -93,6 +148,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpoin
return FALSE;
}
elt->flags = flags;
+ elt->size = len;
/* Place to the backend */
if (storage->backend) {
@@ -126,7 +182,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, stru
/* Now check limits */
while (storage->memory + elt->size > storage->max_memory) {
if (storage->expire) {
- storage->expire->step_func (storage->expire, storage);
+ storage->expire->step_func (storage->expire, storage, time (NULL));
}
else {
msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name);
@@ -150,7 +206,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, stru
/** Lookup an element inside kv storage */
struct rspamd_kv_element*
-rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key)
+rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, time_t now)
{
struct rspamd_kv_element *elt = NULL;
@@ -158,8 +214,20 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key)
elt = storage->cache->lookup_func (storage->cache, key);
/* Next look at the backend */
- if (storage->backend) {
+ if (elt == NULL && storage->backend) {
elt = storage->backend->lookup_func (storage->backend, key);
+ if (elt) {
+ /* Put this element into cache */
+ rspamd_kv_storage_insert_internal (storage, elt);
+ }
+ }
+
+ if (elt && (elt->flags & KV_ELT_PERSISTENT) == 0) {
+ /* Check expiration */
+ if (now - elt->age > elt->expire) {
+ rspamd_kv_storage_delete (storage, key);
+ elt = NULL;
+ }
}
return elt;
@@ -169,35 +237,439 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key)
gboolean
rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key)
{
- gboolean res = TRUE;
+ struct rspamd_kv_element *elt;
/* First delete key from cache */
- res = storage->cache->delete_func (storage->cache, key);
+ elt = storage->cache->delete_func (storage->cache, key);
/* Now delete from backend */
if (storage->backend) {
- res = storage->backend->delete_func (storage->backend, key);
+ if (elt == NULL) {
+ elt = storage->backend->delete_func (storage->backend, key);
+ }
+ else {
+ storage->backend->delete_func (storage->backend, key);
+ }
}
/* Notify expire */
- /* XXX: implement this */
+ if (elt) {
+ storage->expire->delete_func (storage->expire, elt);
+ storage->elts --;
+ storage->memory -= elt->size;
+ }
- return res;
+ return elt != NULL;
}
/** Destroy kv storage */
void
rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage)
{
- if (storage->cache) {
+ if (storage->cache && storage->cache->destroy_func) {
storage->cache->destroy_func (storage->cache);
}
- if (storage->backend) {
+ if (storage->backend && storage->backend->destroy_func) {
storage->backend->destroy_func (storage->backend);
}
- if (storage->expire) {
+ if (storage->expire && storage->expire->destroy_func) {
storage->expire->destroy_func (storage->expire);
}
g_free (storage->name);
g_slice_free1 (sizeof (struct rspamd_kv_storage), storage);
}
+
+
+/**
+ * LRU expire functions
+ */
+
+struct rspamd_kv_lru_expire {
+ expire_init init_func; /*< this callback is called on kv storage initialization */
+ expire_insert insert_func; /*< this callback is called when element is inserted */
+ expire_step step_func; /*< this callback is used when cache is full */
+ expire_delete delete_func; /*< this callback is called when an element is deleted */
+ expire_destroy destroy_func; /*< this callback is used for destroying all elements inside expire */
+
+ guint queues;
+ TAILQ_HEAD (eltq, rspamd_kv_element) *heads;
+ guint *heads_elts;
+};
+
+/**
+ * Insert an element into expire queue
+ */
+static void
+rspamd_lru_insert (struct rspamd_kv_expire *e, struct rspamd_kv_element *elt)
+{
+ struct rspamd_kv_lru_expire *expire = (struct rspamd_kv_lru_expire *)e;
+ guint sel_head;
+
+ /* Get a proper queue */
+ sel_head = elt->hash % expire->queues;
+ TAILQ_INSERT_HEAD (&expire->heads[sel_head], elt, entry);
+ expire->heads_elts[sel_head] ++;
+}
+/**
+ * Delete an element from expire queue
+ */
+static void
+rspamd_lru_delete (struct rspamd_kv_expire *e, struct rspamd_kv_element *elt)
+{
+ struct rspamd_kv_lru_expire *expire = (struct rspamd_kv_lru_expire *)e;
+ guint sel_head;
+
+ /* Get a proper queue */
+ sel_head = elt->hash % expire->queues;
+ /* Unlink element */
+ TAILQ_REMOVE (&expire->heads[sel_head], elt, entry);
+ expire->heads_elts[sel_head] --;
+}
+
+/**
+ * Expire elements
+ */
+static gboolean
+rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *storage, time_t now)
+{
+ guint i;
+ struct rspamd_kv_lru_expire *expire = (struct rspamd_kv_lru_expire *)e;
+ struct rspamd_kv_element *elt, *oldest_elt, *temp;
+ time_t min_diff = G_MAXLONG, diff;
+ gboolean res = FALSE;
+
+ for (i = 0; i < expire->queues; i ++) {
+ elt = expire->heads[i].tqh_first;
+ if (elt && (elt->flags & KV_ELT_PERSISTENT) == 0) {
+ diff = elt->expire - (now - elt->age);
+ if (diff > 0) {
+ /* This element is not expired */
+ if (diff < min_diff) {
+ min_diff = diff;
+ oldest_elt = elt;
+ }
+ }
+ else {
+ /* This element is already expired */
+ rspamd_kv_storage_delete (storage, elt->key);
+ res = TRUE;
+ /* Check other elements in this queue */
+ TAILQ_FOREACH_SAFE (elt, &expire->heads[i], entry, temp) {
+ if ((elt->flags & KV_ELT_PERSISTENT) != 0 || elt->expire < (now - elt->age)) {
+ break;
+ }
+ rspamd_kv_storage_delete (storage, elt->key);
+ }
+ break;
+ }
+ }
+ }
+
+ if (!res) {
+ /* Oust the oldest element from cache */
+ storage->cache->delete_func (storage->cache, oldest_elt->key);
+ oldest_elt->flags |= KV_ELT_OUSTED;
+ storage->memory -= oldest_elt->size + sizeof (*elt);
+ storage->elts --;
+ rspamd_lru_delete (e, oldest_elt);
+ }
+
+ return TRUE;
+}
+
+/**
+ * Destroy LRU expire memory
+ */
+static void
+rspamd_lru_destroy (struct rspamd_kv_expire *e)
+{
+ struct rspamd_kv_lru_expire *expire = (struct rspamd_kv_lru_expire *)e;
+
+ g_slice_free1 (sizeof (struct eltq) * expire->queues, expire->heads);
+ g_slice_free1 (sizeof (guint) * expire->queues, expire->heads_elts);
+ g_slice_free1 (sizeof (struct rspamd_kv_lru_expire), expire);
+}
+
+/**
+ * Create new LRU cache
+ */
+struct rspamd_kv_expire*
+rspamd_lru_expire_new (guint queues)
+{
+ struct rspamd_kv_lru_expire *new;
+ guint i;
+
+ new = g_slice_alloc (sizeof (struct rspamd_kv_lru_expire));
+ new->queues = queues;
+ new->heads = g_slice_alloc (sizeof (struct eltq) * queues);
+ new->heads_elts = g_slice_alloc0 (sizeof (guint) * queues);
+
+ for (i = 0; i < queues; i ++) {
+ TAILQ_INIT (&new->heads[i]);
+ }
+
+ /* Set callbacks */
+ new->init_func = NULL;
+ new->insert_func = rspamd_lru_insert;
+ new->delete_func = rspamd_lru_delete;
+ new->step_func = rspamd_lru_expire_step;
+ new->destroy_func = rspamd_lru_destroy;
+
+ return (struct rspamd_kv_expire *)new;
+}
+
+/*
+ * KV cache hash table
+ */
+struct rspamd_kv_hash_cache {
+ cache_init init_func; /*< this callback is called on kv storage initialization */
+ cache_insert insert_func; /*< this callback is called when element is inserted */
+ cache_replace replace_func; /*< this callback is called when element is replace */
+ cache_lookup lookup_func; /*< this callback is used for lookup of element */
+ cache_delete delete_func; /*< this callback is called when an element is deleted */
+ cache_destroy destroy_func; /*< this callback is used for destroying all elements inside cache */
+ GHashTable *hash;
+};
+
+/**
+ * Insert an element inside cache
+ */
+static struct rspamd_kv_element*
+rspamd_kv_hash_insert (struct rspamd_kv_cache *c, gpointer key, gpointer value, gsize len)
+{
+ struct rspamd_kv_element *elt;
+ struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c;
+
+ if ((elt = g_hash_table_lookup (cache->hash, key)) == NULL) {
+ elt = g_slice_alloc0 (sizeof (struct rspamd_kv_element) + len);
+ elt->age = time (NULL);
+ elt->key = key;
+ elt->size = len;
+ elt->hash = rspamd_strcase_hash (key);
+ memcpy (elt->data, value, len);
+ g_hash_table_insert (cache->hash, key, elt);
+ }
+
+ return elt;
+}
+
+/**
+ * Lookup an item inside hash
+ */
+static struct rspamd_kv_element*
+rspamd_kv_hash_lookup (struct rspamd_kv_cache *c, gpointer key)
+{
+ struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c;
+
+ return g_hash_table_lookup (cache->hash, key);
+}
+
+/**
+ * Replace an element inside cache
+ */
+static gboolean
+rspamd_kv_hash_replace (struct rspamd_kv_cache *c, gpointer key, struct rspamd_kv_element *elt)
+{
+ struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c;
+
+ g_hash_table_replace (cache->hash, key, elt);
+
+ return TRUE;
+}
+
+/**
+ * Delete an element from cache
+ */
+static struct rspamd_kv_element *
+rspamd_kv_hash_delete (struct rspamd_kv_cache *c, gpointer key)
+{
+ struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c;
+ struct rspamd_kv_element *elt;
+
+ elt = g_hash_table_lookup (cache->hash, key);
+ if (elt) {
+ g_hash_table_steal (cache->hash, key);
+ }
+ return elt;
+}
+
+/**
+ * Destroy the whole cache
+ */
+static void
+rspamd_kv_hash_destroy (struct rspamd_kv_cache *c)
+{
+ struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c;
+
+ g_hash_table_destroy (cache->hash);
+ g_slice_free1 (sizeof (struct rspamd_kv_hash_cache), cache);
+}
+
+/**
+ * Destroy kv_element structure
+ */
+static void
+kv_elt_destroy_func (gpointer e)
+{
+ struct rspamd_kv_element *elt = e;
+ g_slice_free1 (sizeof (struct rspamd_kv_element) + elt->size, elt);
+}
+
+/**
+ * Create new hash kv cache
+ */
+struct rspamd_kv_cache*
+rspamd_kv_hash_new (void)
+{
+ struct rspamd_kv_hash_cache *new;
+
+ new = g_slice_alloc (sizeof (struct rspamd_kv_hash_cache));
+ new->hash = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, NULL, kv_elt_destroy_func);
+ new->init_func = NULL;
+ new->insert_func = rspamd_kv_hash_insert;
+ new->lookup_func = rspamd_kv_hash_lookup;
+ new->replace_func = rspamd_kv_hash_replace;
+ new->delete_func = rspamd_kv_hash_delete;
+ new->destroy_func = rspamd_kv_hash_destroy;
+
+ return (struct rspamd_kv_cache *)new;
+}
+
+/*
+ * Radix cache hash table
+ */
+struct rspamd_kv_radix_cache {
+ cache_init init_func; /*< this callback is called on kv storage initialization */
+ cache_insert insert_func; /*< this callback is called when element is inserted */
+ cache_replace replace_func; /*< this callback is called when element is replace */
+ cache_lookup lookup_func; /*< this callback is used for lookup of element */
+ cache_delete delete_func; /*< this callback is called when an element is deleted */
+ cache_destroy destroy_func; /*< this callback is used for destroying all elements inside cache */
+ radix_tree_t *tree;
+};
+
+/**
+ * Validate a key for radix
+ */
+static guint32
+rspamd_kv_radix_validate (gpointer key)
+{
+ struct in_addr addr;
+
+ if (inet_aton (key, &addr) == 0) {
+ return 0;
+ }
+
+ return addr.s_addr;
+}
+
+/**
+ * Insert an element inside cache
+ */
+static struct rspamd_kv_element*
+rspamd_kv_radix_insert (struct rspamd_kv_cache *c, gpointer key, gpointer value, gsize len)
+{
+ struct rspamd_kv_element *elt;
+ struct rspamd_kv_radix_cache *cache = (struct rspamd_kv_radix_cache *)c;
+ guint32 rkey = rspamd_kv_radix_validate (key);
+
+ if (rkey == 0) {
+ return NULL;
+ }
+ elt = (struct rspamd_kv_element *)radix32tree_find (cache->tree, rkey);
+ if ((uintptr_t)elt == RADIX_NO_VALUE) {
+ elt = g_slice_alloc0 (sizeof (struct rspamd_kv_element) + len);
+ elt->age = time (NULL);
+ elt->key = key;
+ elt->size = len;
+ elt->hash = rkey;
+ memcpy (elt->data, value, len);
+ radix32tree_insert (cache->tree, rkey, 0xffffffff, (uintptr_t)elt);
+ }
+
+ return elt;
+}
+
+/**
+ * Lookup an item inside radix
+ */
+static struct rspamd_kv_element*
+rspamd_kv_radix_lookup (struct rspamd_kv_cache *c, gpointer key)
+{
+ struct rspamd_kv_radix_cache *cache = (struct rspamd_kv_radix_cache *)c;
+ guint32 rkey = rspamd_kv_radix_validate (key);
+ struct rspamd_kv_element *elt;
+
+ elt = (struct rspamd_kv_element *)radix32tree_find (cache->tree, rkey);
+ if ((uintptr_t)elt == RADIX_NO_VALUE) {
+ return NULL;
+ }
+
+ return elt;
+}
+
+/**
+ * Replace an element inside cache
+ */
+static gboolean
+rspamd_kv_radix_replace (struct rspamd_kv_cache *c, gpointer key, struct rspamd_kv_element *elt)
+{
+ struct rspamd_kv_radix_cache *cache = (struct rspamd_kv_radix_cache *)c;
+ guint32 rkey = rspamd_kv_radix_validate (key);
+
+ radix32tree_replace (cache->tree, rkey, 0xffffffff, (uintptr_t)elt);
+
+ return TRUE;
+}
+
+/**
+ * Delete an element from cache
+ */
+static struct rspamd_kv_element *
+rspamd_kv_radix_delete (struct rspamd_kv_cache *c, gpointer key)
+{
+ struct rspamd_kv_radix_cache *cache = (struct rspamd_kv_radix_cache *)c;
+ struct rspamd_kv_element *elt;
+ guint32 rkey = rspamd_kv_radix_validate (key);
+
+ elt = (struct rspamd_kv_element *)radix32tree_find (cache->tree, rkey);
+ if ((uintptr_t)elt != RADIX_NO_VALUE) {
+ radix32tree_delete (cache->tree, rkey, 0xffffffff);
+ }
+ else {
+ return NULL;
+ }
+ return elt;
+}
+
+/**
+ * Destroy the whole cache
+ */
+static void
+rspamd_kv_radix_destroy (struct rspamd_kv_cache *c)
+{
+ struct rspamd_kv_radix_cache *cache = (struct rspamd_kv_radix_cache *)c;
+
+ radix_tree_free (cache->tree);
+ g_slice_free1 (sizeof (struct rspamd_kv_radix_cache), cache);
+}
+
+/**
+ * Create new radix kv cache
+ */
+struct rspamd_kv_cache*
+rspamd_kv_radix_new (void)
+{
+ struct rspamd_kv_radix_cache *new;
+
+ new = g_slice_alloc (sizeof (struct rspamd_kv_radix_cache));
+ new->tree = radix_tree_create ();
+ new->init_func = NULL;
+ new->insert_func = rspamd_kv_radix_insert;
+ new->lookup_func = rspamd_kv_radix_lookup;
+ new->replace_func = rspamd_kv_radix_replace;
+ new->delete_func = rspamd_kv_radix_delete;
+ new->destroy_func = rspamd_kv_radix_destroy;
+
+ return (struct rspamd_kv_cache *)new;
+}
diff --git a/src/kvstorage.h b/src/kvstorage.h
index 6caa3f4a1..4bb7f20a7 100644
--- a/src/kvstorage.h
+++ b/src/kvstorage.h
@@ -45,30 +45,35 @@ typedef void (*backend_init)(struct rspamd_kv_backend *backend);
typedef gboolean (*backend_insert)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt);
typedef gboolean (*backend_replace)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt);
typedef struct rspamd_kv_element* (*backend_lookup)(struct rspamd_kv_backend *backend, gpointer key);
-typedef gboolean (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key);
+typedef struct rspamd_kv_element* (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key);
typedef void (*backend_destroy)(struct rspamd_kv_backend *backend);
/* Callbacks for expire */
typedef void (*expire_init)(struct rspamd_kv_expire *expire);
typedef void (*expire_insert)(struct rspamd_kv_expire *expire, struct rspamd_kv_element *elt);
typedef void (*expire_delete)(struct rspamd_kv_expire *expire, struct rspamd_kv_element *elt);
-typedef gboolean (*expire_step)(struct rspamd_kv_expire *expire, struct rspamd_kv_storage *storage);
+typedef gboolean (*expire_step)(struct rspamd_kv_expire *expire, struct rspamd_kv_storage *storage, time_t now);
typedef void (*expire_destroy)(struct rspamd_kv_expire *expire);
/* Flags of element */
enum rspamd_kv_flags {
KV_ELT_ARRAY = 1 << 0,
- KV_ELT_PERSISTEN = 1 << 1
+ KV_ELT_PERSISTENT = 1 << 1,
+ KV_ELT_DIRTY = 1 << 2,
+ KV_ELT_OUSTED = 1 << 3
};
/* Common structures description */
struct rspamd_kv_element {
time_t age; /*< age of element */
+ guint32 expire; /*< expire of element */
enum rspamd_kv_flags flags; /*< element flags */
gsize size; /*< size of element */
- TAILQ_ENTRY(rspamd_kv_element) entry; /*< list entry */
+ TAILQ_ENTRY (rspamd_kv_element) entry; /*< list entry */
+ guint32 hash; /*< numeric hash */
+ gpointer key; /*< pointer to key */
gchar data[1]; /*< expandable data */
};
@@ -126,7 +131,7 @@ gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer k
gboolean rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, struct rspamd_kv_element *elt);
/** Lookup an element inside kv storage */
-struct rspamd_kv_element* rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key);
+struct rspamd_kv_element* rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, time_t now);
/** Expire an element from kv storage */
gboolean rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key);
@@ -134,4 +139,20 @@ gboolean rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer k
/** Destroy kv storage */
void rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage);
+/**
+ * LRU expire
+ */
+struct rspamd_kv_expire* rspamd_lru_expire_new (guint queues);
+
+/**
+ * Ordinary hash
+ */
+struct rspamd_kv_cache* rspamd_kv_hash_new (void);
+
+/**
+ * Radix tree
+ */
+struct rspamd_kv_cache* rspamd_kv_radix_new (void);
+
+
#endif /* KVSTORAGE_H_ */