diff options
-rw-r--r-- | src/kvstorage.c | 496 | ||||
-rw-r--r-- | src/kvstorage.h | 31 |
2 files changed, 510 insertions, 17 deletions
diff --git a/src/kvstorage.c b/src/kvstorage.c index 1371df9f5..2cfad9459 100644 --- a/src/kvstorage.c +++ b/src/kvstorage.c @@ -24,6 +24,7 @@ #include "config.h" #include "kvstorage.h" #include "main.h" +#include "radix.h" #define MAX_EXPIRE_STEPS 10 @@ -56,9 +57,63 @@ rspamd_kv_storage_new (gint id, const gchar *name, struct rspamd_kv_cache *cache rspamd_snprintf (new->name, sizeof ("18446744073709551616"), "%d", id); } + /* Init structures */ + if (new->cache->init_func) { + new->cache->init_func (new->cache); + } + if (new->backend && new->backend->init_func) { + new->backend->init_func (new->backend); + } + if (new->expire && new->expire->init_func) { + new->expire->init_func (new->expire); + } + return new; } +/** Internal insertion to the kv storage from backend */ +gboolean +rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, struct rspamd_kv_element *elt) +{ + gint steps = 0; + + /* Hard limit */ + if (elt->size > storage->max_memory) { + msg_info ("<%s>: trying to insert value of length %z while limit is %z", elt->size, storage->max_memory); + return FALSE; + } + + /* Now check limits */ + while (storage->memory + elt->size > storage->max_memory || storage->elts >= storage->max_elts) { + if (storage->expire) { + storage->expire->step_func (storage->expire, storage, time (NULL)); + } + else { + msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name); + } + if (++steps > MAX_EXPIRE_STEPS) { + msg_warn ("<%s>: cannot expire enough keys in storage", storage->name); + return FALSE; + } + } + + /* Insert elt to the cache */ + elt = storage->cache->insert_func (storage->cache, elt->key, elt->data, elt->size); + if (elt == NULL) { + return FALSE; + } + + /* Insert to the expire */ + if (storage->expire) { + storage->expire->insert_func (storage->expire, elt); + } + + storage->elts ++; + storage->memory += elt->size + sizeof (struct rspamd_kv_element); + + return TRUE; +} + /** Insert new element to the kv storage */ gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpointer data, gsize len, gint flags) @@ -76,7 +131,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpoin /* Now check limits */ while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) { if (storage->expire) { - storage->expire->step_func (storage->expire, storage); + storage->expire->step_func (storage->expire, storage, time (NULL)); } else { msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name); @@ -93,6 +148,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, gpoin return FALSE; } elt->flags = flags; + elt->size = len; /* Place to the backend */ if (storage->backend) { @@ -126,7 +182,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, stru /* Now check limits */ while (storage->memory + elt->size > storage->max_memory) { if (storage->expire) { - storage->expire->step_func (storage->expire, storage); + storage->expire->step_func (storage->expire, storage, time (NULL)); } else { msg_warn ("<%s>: storage %s is full and no expire function is defined", storage->name); @@ -150,7 +206,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, stru /** Lookup an element inside kv storage */ struct rspamd_kv_element* -rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key) +rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, time_t now) { struct rspamd_kv_element *elt = NULL; @@ -158,8 +214,20 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key) elt = storage->cache->lookup_func (storage->cache, key); /* Next look at the backend */ - if (storage->backend) { + if (elt == NULL && storage->backend) { elt = storage->backend->lookup_func (storage->backend, key); + if (elt) { + /* Put this element into cache */ + rspamd_kv_storage_insert_internal (storage, elt); + } + } + + if (elt && (elt->flags & KV_ELT_PERSISTENT) == 0) { + /* Check expiration */ + if (now - elt->age > elt->expire) { + rspamd_kv_storage_delete (storage, key); + elt = NULL; + } } return elt; @@ -169,35 +237,439 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key) gboolean rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key) { - gboolean res = TRUE; + struct rspamd_kv_element *elt; /* First delete key from cache */ - res = storage->cache->delete_func (storage->cache, key); + elt = storage->cache->delete_func (storage->cache, key); /* Now delete from backend */ if (storage->backend) { - res = storage->backend->delete_func (storage->backend, key); + if (elt == NULL) { + elt = storage->backend->delete_func (storage->backend, key); + } + else { + storage->backend->delete_func (storage->backend, key); + } } /* Notify expire */ - /* XXX: implement this */ + if (elt) { + storage->expire->delete_func (storage->expire, elt); + storage->elts --; + storage->memory -= elt->size; + } - return res; + return elt != NULL; } /** Destroy kv storage */ void rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage) { - if (storage->cache) { + if (storage->cache && storage->cache->destroy_func) { storage->cache->destroy_func (storage->cache); } - if (storage->backend) { + if (storage->backend && storage->backend->destroy_func) { storage->backend->destroy_func (storage->backend); } - if (storage->expire) { + if (storage->expire && storage->expire->destroy_func) { storage->expire->destroy_func (storage->expire); } g_free (storage->name); g_slice_free1 (sizeof (struct rspamd_kv_storage), storage); } + + +/** + * LRU expire functions + */ + +struct rspamd_kv_lru_expire { + expire_init init_func; /*< this callback is called on kv storage initialization */ + expire_insert insert_func; /*< this callback is called when element is inserted */ + expire_step step_func; /*< this callback is used when cache is full */ + expire_delete delete_func; /*< this callback is called when an element is deleted */ + expire_destroy destroy_func; /*< this callback is used for destroying all elements inside expire */ + + guint queues; + TAILQ_HEAD (eltq, rspamd_kv_element) *heads; + guint *heads_elts; +}; + +/** + * Insert an element into expire queue + */ +static void +rspamd_lru_insert (struct rspamd_kv_expire *e, struct rspamd_kv_element *elt) +{ + struct rspamd_kv_lru_expire *expire = (struct rspamd_kv_lru_expire *)e; + guint sel_head; + + /* Get a proper queue */ + sel_head = elt->hash % expire->queues; + TAILQ_INSERT_HEAD (&expire->heads[sel_head], elt, entry); + expire->heads_elts[sel_head] ++; +} +/** + * Delete an element from expire queue + */ +static void +rspamd_lru_delete (struct rspamd_kv_expire *e, struct rspamd_kv_element *elt) +{ + struct rspamd_kv_lru_expire *expire = (struct rspamd_kv_lru_expire *)e; + guint sel_head; + + /* Get a proper queue */ + sel_head = elt->hash % expire->queues; + /* Unlink element */ + TAILQ_REMOVE (&expire->heads[sel_head], elt, entry); + expire->heads_elts[sel_head] --; +} + +/** + * Expire elements + */ +static gboolean +rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *storage, time_t now) +{ + guint i; + struct rspamd_kv_lru_expire *expire = (struct rspamd_kv_lru_expire *)e; + struct rspamd_kv_element *elt, *oldest_elt, *temp; + time_t min_diff = G_MAXLONG, diff; + gboolean res = FALSE; + + for (i = 0; i < expire->queues; i ++) { + elt = expire->heads[i].tqh_first; + if (elt && (elt->flags & KV_ELT_PERSISTENT) == 0) { + diff = elt->expire - (now - elt->age); + if (diff > 0) { + /* This element is not expired */ + if (diff < min_diff) { + min_diff = diff; + oldest_elt = elt; + } + } + else { + /* This element is already expired */ + rspamd_kv_storage_delete (storage, elt->key); + res = TRUE; + /* Check other elements in this queue */ + TAILQ_FOREACH_SAFE (elt, &expire->heads[i], entry, temp) { + if ((elt->flags & KV_ELT_PERSISTENT) != 0 || elt->expire < (now - elt->age)) { + break; + } + rspamd_kv_storage_delete (storage, elt->key); + } + break; + } + } + } + + if (!res) { + /* Oust the oldest element from cache */ + storage->cache->delete_func (storage->cache, oldest_elt->key); + oldest_elt->flags |= KV_ELT_OUSTED; + storage->memory -= oldest_elt->size + sizeof (*elt); + storage->elts --; + rspamd_lru_delete (e, oldest_elt); + } + + return TRUE; +} + +/** + * Destroy LRU expire memory + */ +static void +rspamd_lru_destroy (struct rspamd_kv_expire *e) +{ + struct rspamd_kv_lru_expire *expire = (struct rspamd_kv_lru_expire *)e; + + g_slice_free1 (sizeof (struct eltq) * expire->queues, expire->heads); + g_slice_free1 (sizeof (guint) * expire->queues, expire->heads_elts); + g_slice_free1 (sizeof (struct rspamd_kv_lru_expire), expire); +} + +/** + * Create new LRU cache + */ +struct rspamd_kv_expire* +rspamd_lru_expire_new (guint queues) +{ + struct rspamd_kv_lru_expire *new; + guint i; + + new = g_slice_alloc (sizeof (struct rspamd_kv_lru_expire)); + new->queues = queues; + new->heads = g_slice_alloc (sizeof (struct eltq) * queues); + new->heads_elts = g_slice_alloc0 (sizeof (guint) * queues); + + for (i = 0; i < queues; i ++) { + TAILQ_INIT (&new->heads[i]); + } + + /* Set callbacks */ + new->init_func = NULL; + new->insert_func = rspamd_lru_insert; + new->delete_func = rspamd_lru_delete; + new->step_func = rspamd_lru_expire_step; + new->destroy_func = rspamd_lru_destroy; + + return (struct rspamd_kv_expire *)new; +} + +/* + * KV cache hash table + */ +struct rspamd_kv_hash_cache { + cache_init init_func; /*< this callback is called on kv storage initialization */ + cache_insert insert_func; /*< this callback is called when element is inserted */ + cache_replace replace_func; /*< this callback is called when element is replace */ + cache_lookup lookup_func; /*< this callback is used for lookup of element */ + cache_delete delete_func; /*< this callback is called when an element is deleted */ + cache_destroy destroy_func; /*< this callback is used for destroying all elements inside cache */ + GHashTable *hash; +}; + +/** + * Insert an element inside cache + */ +static struct rspamd_kv_element* +rspamd_kv_hash_insert (struct rspamd_kv_cache *c, gpointer key, gpointer value, gsize len) +{ + struct rspamd_kv_element *elt; + struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c; + + if ((elt = g_hash_table_lookup (cache->hash, key)) == NULL) { + elt = g_slice_alloc0 (sizeof (struct rspamd_kv_element) + len); + elt->age = time (NULL); + elt->key = key; + elt->size = len; + elt->hash = rspamd_strcase_hash (key); + memcpy (elt->data, value, len); + g_hash_table_insert (cache->hash, key, elt); + } + + return elt; +} + +/** + * Lookup an item inside hash + */ +static struct rspamd_kv_element* +rspamd_kv_hash_lookup (struct rspamd_kv_cache *c, gpointer key) +{ + struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c; + + return g_hash_table_lookup (cache->hash, key); +} + +/** + * Replace an element inside cache + */ +static gboolean +rspamd_kv_hash_replace (struct rspamd_kv_cache *c, gpointer key, struct rspamd_kv_element *elt) +{ + struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c; + + g_hash_table_replace (cache->hash, key, elt); + + return TRUE; +} + +/** + * Delete an element from cache + */ +static struct rspamd_kv_element * +rspamd_kv_hash_delete (struct rspamd_kv_cache *c, gpointer key) +{ + struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c; + struct rspamd_kv_element *elt; + + elt = g_hash_table_lookup (cache->hash, key); + if (elt) { + g_hash_table_steal (cache->hash, key); + } + return elt; +} + +/** + * Destroy the whole cache + */ +static void +rspamd_kv_hash_destroy (struct rspamd_kv_cache *c) +{ + struct rspamd_kv_hash_cache *cache = (struct rspamd_kv_hash_cache *)c; + + g_hash_table_destroy (cache->hash); + g_slice_free1 (sizeof (struct rspamd_kv_hash_cache), cache); +} + +/** + * Destroy kv_element structure + */ +static void +kv_elt_destroy_func (gpointer e) +{ + struct rspamd_kv_element *elt = e; + g_slice_free1 (sizeof (struct rspamd_kv_element) + elt->size, elt); +} + +/** + * Create new hash kv cache + */ +struct rspamd_kv_cache* +rspamd_kv_hash_new (void) +{ + struct rspamd_kv_hash_cache *new; + + new = g_slice_alloc (sizeof (struct rspamd_kv_hash_cache)); + new->hash = g_hash_table_new_full (rspamd_strcase_hash, rspamd_strcase_equal, NULL, kv_elt_destroy_func); + new->init_func = NULL; + new->insert_func = rspamd_kv_hash_insert; + new->lookup_func = rspamd_kv_hash_lookup; + new->replace_func = rspamd_kv_hash_replace; + new->delete_func = rspamd_kv_hash_delete; + new->destroy_func = rspamd_kv_hash_destroy; + + return (struct rspamd_kv_cache *)new; +} + +/* + * Radix cache hash table + */ +struct rspamd_kv_radix_cache { + cache_init init_func; /*< this callback is called on kv storage initialization */ + cache_insert insert_func; /*< this callback is called when element is inserted */ + cache_replace replace_func; /*< this callback is called when element is replace */ + cache_lookup lookup_func; /*< this callback is used for lookup of element */ + cache_delete delete_func; /*< this callback is called when an element is deleted */ + cache_destroy destroy_func; /*< this callback is used for destroying all elements inside cache */ + radix_tree_t *tree; +}; + +/** + * Validate a key for radix + */ +static guint32 +rspamd_kv_radix_validate (gpointer key) +{ + struct in_addr addr; + + if (inet_aton (key, &addr) == 0) { + return 0; + } + + return addr.s_addr; +} + +/** + * Insert an element inside cache + */ +static struct rspamd_kv_element* +rspamd_kv_radix_insert (struct rspamd_kv_cache *c, gpointer key, gpointer value, gsize len) +{ + struct rspamd_kv_element *elt; + struct rspamd_kv_radix_cache *cache = (struct rspamd_kv_radix_cache *)c; + guint32 rkey = rspamd_kv_radix_validate (key); + + if (rkey == 0) { + return NULL; + } + elt = (struct rspamd_kv_element *)radix32tree_find (cache->tree, rkey); + if ((uintptr_t)elt == RADIX_NO_VALUE) { + elt = g_slice_alloc0 (sizeof (struct rspamd_kv_element) + len); + elt->age = time (NULL); + elt->key = key; + elt->size = len; + elt->hash = rkey; + memcpy (elt->data, value, len); + radix32tree_insert (cache->tree, rkey, 0xffffffff, (uintptr_t)elt); + } + + return elt; +} + +/** + * Lookup an item inside radix + */ +static struct rspamd_kv_element* +rspamd_kv_radix_lookup (struct rspamd_kv_cache *c, gpointer key) +{ + struct rspamd_kv_radix_cache *cache = (struct rspamd_kv_radix_cache *)c; + guint32 rkey = rspamd_kv_radix_validate (key); + struct rspamd_kv_element *elt; + + elt = (struct rspamd_kv_element *)radix32tree_find (cache->tree, rkey); + if ((uintptr_t)elt == RADIX_NO_VALUE) { + return NULL; + } + + return elt; +} + +/** + * Replace an element inside cache + */ +static gboolean +rspamd_kv_radix_replace (struct rspamd_kv_cache *c, gpointer key, struct rspamd_kv_element *elt) +{ + struct rspamd_kv_radix_cache *cache = (struct rspamd_kv_radix_cache *)c; + guint32 rkey = rspamd_kv_radix_validate (key); + + radix32tree_replace (cache->tree, rkey, 0xffffffff, (uintptr_t)elt); + + return TRUE; +} + +/** + * Delete an element from cache + */ +static struct rspamd_kv_element * +rspamd_kv_radix_delete (struct rspamd_kv_cache *c, gpointer key) +{ + struct rspamd_kv_radix_cache *cache = (struct rspamd_kv_radix_cache *)c; + struct rspamd_kv_element *elt; + guint32 rkey = rspamd_kv_radix_validate (key); + + elt = (struct rspamd_kv_element *)radix32tree_find (cache->tree, rkey); + if ((uintptr_t)elt != RADIX_NO_VALUE) { + radix32tree_delete (cache->tree, rkey, 0xffffffff); + } + else { + return NULL; + } + return elt; +} + +/** + * Destroy the whole cache + */ +static void +rspamd_kv_radix_destroy (struct rspamd_kv_cache *c) +{ + struct rspamd_kv_radix_cache *cache = (struct rspamd_kv_radix_cache *)c; + + radix_tree_free (cache->tree); + g_slice_free1 (sizeof (struct rspamd_kv_radix_cache), cache); +} + +/** + * Create new radix kv cache + */ +struct rspamd_kv_cache* +rspamd_kv_radix_new (void) +{ + struct rspamd_kv_radix_cache *new; + + new = g_slice_alloc (sizeof (struct rspamd_kv_radix_cache)); + new->tree = radix_tree_create (); + new->init_func = NULL; + new->insert_func = rspamd_kv_radix_insert; + new->lookup_func = rspamd_kv_radix_lookup; + new->replace_func = rspamd_kv_radix_replace; + new->delete_func = rspamd_kv_radix_delete; + new->destroy_func = rspamd_kv_radix_destroy; + + return (struct rspamd_kv_cache *)new; +} diff --git a/src/kvstorage.h b/src/kvstorage.h index 6caa3f4a1..4bb7f20a7 100644 --- a/src/kvstorage.h +++ b/src/kvstorage.h @@ -45,30 +45,35 @@ typedef void (*backend_init)(struct rspamd_kv_backend *backend); typedef gboolean (*backend_insert)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt); typedef gboolean (*backend_replace)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt); typedef struct rspamd_kv_element* (*backend_lookup)(struct rspamd_kv_backend *backend, gpointer key); -typedef gboolean (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key); +typedef struct rspamd_kv_element* (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key); typedef void (*backend_destroy)(struct rspamd_kv_backend *backend); /* Callbacks for expire */ typedef void (*expire_init)(struct rspamd_kv_expire *expire); typedef void (*expire_insert)(struct rspamd_kv_expire *expire, struct rspamd_kv_element *elt); typedef void (*expire_delete)(struct rspamd_kv_expire *expire, struct rspamd_kv_element *elt); -typedef gboolean (*expire_step)(struct rspamd_kv_expire *expire, struct rspamd_kv_storage *storage); +typedef gboolean (*expire_step)(struct rspamd_kv_expire *expire, struct rspamd_kv_storage *storage, time_t now); typedef void (*expire_destroy)(struct rspamd_kv_expire *expire); /* Flags of element */ enum rspamd_kv_flags { KV_ELT_ARRAY = 1 << 0, - KV_ELT_PERSISTEN = 1 << 1 + KV_ELT_PERSISTENT = 1 << 1, + KV_ELT_DIRTY = 1 << 2, + KV_ELT_OUSTED = 1 << 3 }; /* Common structures description */ struct rspamd_kv_element { time_t age; /*< age of element */ + guint32 expire; /*< expire of element */ enum rspamd_kv_flags flags; /*< element flags */ gsize size; /*< size of element */ - TAILQ_ENTRY(rspamd_kv_element) entry; /*< list entry */ + TAILQ_ENTRY (rspamd_kv_element) entry; /*< list entry */ + guint32 hash; /*< numeric hash */ + gpointer key; /*< pointer to key */ gchar data[1]; /*< expandable data */ }; @@ -126,7 +131,7 @@ gboolean rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer k gboolean rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, struct rspamd_kv_element *elt); /** Lookup an element inside kv storage */ -struct rspamd_kv_element* rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key); +struct rspamd_kv_element* rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, time_t now); /** Expire an element from kv storage */ gboolean rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key); @@ -134,4 +139,20 @@ gboolean rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer k /** Destroy kv storage */ void rspamd_kv_storage_destroy (struct rspamd_kv_storage *storage); +/** + * LRU expire + */ +struct rspamd_kv_expire* rspamd_lru_expire_new (guint queues); + +/** + * Ordinary hash + */ +struct rspamd_kv_cache* rspamd_kv_hash_new (void); + +/** + * Radix tree + */ +struct rspamd_kv_cache* rspamd_kv_radix_new (void); + + #endif /* KVSTORAGE_H_ */ |