aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-07 18:15:54 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-07 18:15:54 +0300
commitc28181fe44aefc2c4e31b0d76c50c2793dcc1702 (patch)
treecf9af7ee130e205d85546b3a14913da40afb05bd /src
parentd461b9f83ab8c2b6cc2a762f135e87687f51a8dd (diff)
downloadrspamd-c28181fe44aefc2c4e31b0d76c50c2793dcc1702.tar.gz
rspamd-c28181fe44aefc2c4e31b0d76c50c2793dcc1702.zip
* Add judy storage for fast caching.
Fix LRU expiration.
Diffstat (limited to 'src')
-rw-r--r--src/kvstorage.c262
-rw-r--r--src/kvstorage.h10
-rw-r--r--src/kvstorage_config.c16
-rw-r--r--src/kvstorage_config.h6
4 files changed, 235 insertions, 59 deletions
diff --git a/src/kvstorage.c b/src/kvstorage.c
index e89c8e1c2..6b3260343 100644
--- a/src/kvstorage.c
+++ b/src/kvstorage.c
@@ -25,6 +25,9 @@
#include "kvstorage.h"
#include "main.h"
#include "radix.h"
+#ifdef WITH_JUDY
+#include <Judy.h>
+#endif
#define MAX_EXPIRE_STEPS 10
@@ -92,7 +95,7 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
/* Now check limits */
while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) {
if (storage->expire) {
- storage->expire->step_func (storage->expire, storage, time (NULL));
+ storage->expire->step_func (storage->expire, storage, time (NULL), steps);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -145,7 +148,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key,
/* Now check limits */
while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) {
if (storage->expire) {
- storage->expire->step_func (storage->expire, storage, time (NULL));
+ storage->expire->step_func (storage->expire, storage, time (NULL), steps);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -216,7 +219,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, stru
/* Now check limits */
while (storage->memory + elt->size > storage->max_memory) {
if (storage->expire) {
- storage->expire->step_func (storage->expire, storage, time (NULL));
+ storage->expire->step_func (storage->expire, storage, time (NULL), steps);
}
else {
msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -423,9 +426,7 @@ struct rspamd_kv_lru_expire {
expire_delete delete_func; /*< this callback is called when an element is deleted */
expire_destroy destroy_func; /*< this callback is used for destroying all elements inside expire */
- guint queues;
- TAILQ_HEAD (eltq, rspamd_kv_element) *heads;
- guint *heads_elts;
+ TAILQ_HEAD (eltq, rspamd_kv_element) head;
};
/**
@@ -435,12 +436,9 @@ static void
rspamd_lru_insert (struct rspamd_kv_expire *e, struct rspamd_kv_element *elt)
{
struct rspamd_kv_lru_expire *expire = (struct rspamd_kv_lru_expire *)e;
- guint sel_head;
/* Get a proper queue */
- sel_head = elt->hash % expire->queues;
- TAILQ_INSERT_HEAD (&expire->heads[sel_head], elt, entry);
- expire->heads_elts[sel_head] ++;
+ TAILQ_INSERT_TAIL (&expire->head, elt, entry);
}
/**
* Delete an element from expire queue
@@ -449,59 +447,53 @@ static void
rspamd_lru_delete (struct rspamd_kv_expire *e, struct rspamd_kv_element *elt)
{
struct rspamd_kv_lru_expire *expire = (struct rspamd_kv_lru_expire *)e;
- guint sel_head;
- /* Get a proper queue */
- sel_head = elt->hash % expire->queues;
/* Unlink element */
- TAILQ_REMOVE (&expire->heads[sel_head], elt, entry);
- expire->heads_elts[sel_head] --;
+ TAILQ_REMOVE (&expire->head, elt, entry);
}
/**
* Expire elements
*/
static gboolean
-rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *storage, time_t now)
+rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *storage, time_t now, gboolean forced)
{
- guint i;
struct rspamd_kv_lru_expire *expire = (struct rspamd_kv_lru_expire *)e;
- struct rspamd_kv_element *elt, *oldest_elt, *temp;
- time_t min_diff = G_MAXLONG, diff;
+ struct rspamd_kv_element *elt, *oldest_elt = NULL, *temp;
+ time_t diff;
gboolean res = FALSE;
- for (i = 0; i < expire->queues; i ++) {
- elt = expire->heads[i].tqh_first;
- if (elt && (elt->flags & (KV_ELT_PERSISTENT|KV_ELT_DIRTY)) == 0) {
- diff = elt->expire - (now - elt->age);
- if (diff > 0) {
- /* This element is not expired */
- if (diff < min_diff) {
- min_diff = diff;
- oldest_elt = elt;
+ elt = TAILQ_FIRST (&expire->head);
+ if (elt && (forced || (elt->flags & (KV_ELT_PERSISTENT|KV_ELT_DIRTY)) == 0)) {
+ diff = elt->expire - (now - elt->age);
+ if (diff > 0) {
+ oldest_elt = elt;
+ }
+ else {
+ /* This element is already expired */
+ storage->cache->steal_func (storage->cache, elt);
+ /* Free memory */
+ g_slice_free1 (ELT_SIZE (elt), elt);
+ storage->memory -= ELT_SIZE (oldest_elt);
+ storage->elts --;
+ TAILQ_REMOVE (&expire->head, elt, entry);
+ res = TRUE;
+ /* Check other elements in this queue */
+ TAILQ_FOREACH_SAFE (elt, &expire->head, entry, temp) {
+ if ((elt->flags & (KV_ELT_PERSISTENT|KV_ELT_DIRTY)) != 0 || elt->expire < (now - elt->age)) {
+ break;
}
- }
- else {
- /* This element is already expired */
+ storage->memory -= ELT_SIZE (elt);
+ storage->elts --;
storage->cache->steal_func (storage->cache, elt);
/* Free memory */
g_slice_free1 (ELT_SIZE (elt), elt);
- res = TRUE;
- /* Check other elements in this queue */
- TAILQ_FOREACH_SAFE (elt, &expire->heads[i], entry, temp) {
- if ((elt->flags & (KV_ELT_PERSISTENT|KV_ELT_DIRTY)) != 0 || elt->expire < (now - elt->age)) {
- break;
- }
- storage->cache->steal_func (storage->cache, elt);
- /* Free memory */
- g_slice_free1 (ELT_SIZE (elt), elt);
- }
- break;
+ TAILQ_REMOVE (&expire->head, elt, entry);
}
}
}
- if (!res) {
+ if (!res && oldest_elt != NULL) {
storage->memory -= ELT_SIZE (oldest_elt);
storage->elts --;
storage->cache->steal_func (storage->cache, oldest_elt);
@@ -512,6 +504,7 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st
else {
g_slice_free1 (ELT_SIZE (oldest_elt), oldest_elt);
}
+ TAILQ_REMOVE (&expire->head, oldest_elt, entry);
}
return TRUE;
@@ -525,8 +518,6 @@ rspamd_lru_destroy (struct rspamd_kv_expire *e)
{
struct rspamd_kv_lru_expire *expire = (struct rspamd_kv_lru_expire *)e;
- g_slice_free1 (sizeof (struct eltq) * expire->queues, expire->heads);
- g_slice_free1 (sizeof (guint) * expire->queues, expire->heads_elts);
g_slice_free1 (sizeof (struct rspamd_kv_lru_expire), expire);
}
@@ -534,19 +525,12 @@ rspamd_lru_destroy (struct rspamd_kv_expire *e)
* Create new LRU cache
*/
struct rspamd_kv_expire*
-rspamd_lru_expire_new (guint queues)
+rspamd_lru_expire_new ()
{
struct rspamd_kv_lru_expire *new;
- guint i;
new = g_slice_alloc (sizeof (struct rspamd_kv_lru_expire));
- new->queues = queues;
- new->heads = g_slice_alloc (sizeof (struct eltq) * queues);
- new->heads_elts = g_slice_alloc0 (sizeof (guint) * queues);
-
- for (i = 0; i < queues; i ++) {
- TAILQ_INIT (&new->heads[i]);
- }
+ TAILQ_INIT (&new->head);
/* Set callbacks */
new->init_func = NULL;
@@ -878,3 +862,173 @@ rspamd_kv_radix_new (void)
return (struct rspamd_kv_cache *)new;
}
+
+
+#ifdef WITH_JUDY
+/*
+ * KV cache hash table
+ */
+struct rspamd_kv_judy_cache {
+ cache_init init_func; /*< this callback is called on kv storage initialization */
+ cache_insert insert_func; /*< this callback is called when element is inserted */
+ cache_replace replace_func; /*< this callback is called when element is replace */
+ cache_lookup lookup_func; /*< this callback is used for lookup of element */
+ cache_delete delete_func; /*< this callback is called when an element is deleted */
+ cache_steal steal_func; /*< this callback is used to replace duplicates in cache */
+ cache_destroy destroy_func; /*< this callback is used for destroying all elements inside cache */
+ Pvoid_t judy;
+};
+
+
+/**
+ * Lookup an item inside judy
+ */
+static struct rspamd_kv_element*
+rspamd_kv_judy_lookup (struct rspamd_kv_cache *c, gpointer key)
+{
+ struct rspamd_kv_judy_cache *cache = (struct rspamd_kv_judy_cache *)c;
+ struct rspamd_kv_element *elt = NULL, **pelt;
+
+ JHSG (pelt, cache->judy, key, strlen (key));
+ if (pelt != NULL) {
+ elt = *pelt;
+ }
+ return elt;
+}
+
+/**
+ * Delete an element from cache
+ */
+static struct rspamd_kv_element *
+rspamd_kv_judy_delete (struct rspamd_kv_cache *c, gpointer key)
+{
+ struct rspamd_kv_judy_cache *cache = (struct rspamd_kv_judy_cache *)c;
+ struct rspamd_kv_element *elt;
+ gint rc;
+
+ elt = rspamd_kv_judy_lookup (c, key);
+ if (elt) {
+ JHSD (rc, cache->judy, ELT_KEY (elt), elt->keylen);
+ }
+ return elt;
+}
+
+/**
+ * Steal an element from cache
+ */
+static void
+rspamd_kv_judy_steal (struct rspamd_kv_cache *c, struct rspamd_kv_element *elt)
+{
+ struct rspamd_kv_judy_cache *cache = (struct rspamd_kv_judy_cache *)c;
+ gint rc;
+
+ JHSD (rc, cache->judy, ELT_KEY (elt), elt->keylen);
+}
+
+/**
+ * Insert an element inside cache
+ */
+static struct rspamd_kv_element*
+rspamd_kv_judy_insert (struct rspamd_kv_cache *c, gpointer key, gpointer value, gsize len)
+{
+ struct rspamd_kv_element *elt, **pelt;
+ struct rspamd_kv_judy_cache *cache = (struct rspamd_kv_judy_cache *)c;
+ guint keylen;
+
+ if ((elt = rspamd_kv_judy_lookup (c, key)) == NULL) {
+ keylen = strlen (key);
+ elt = g_slice_alloc0 (sizeof (struct rspamd_kv_element) + len + keylen + 1);
+ elt->age = time (NULL);
+ elt->keylen = keylen;
+ elt->size = len;
+ elt->hash = rspamd_strcase_hash (key);
+ memcpy (elt->data, key, keylen + 1);
+ memcpy (ELT_DATA (elt), value, len);
+ JHSI (pelt, cache->judy, ELT_KEY (elt), elt->keylen);
+ *pelt = elt;
+ }
+ else {
+ rspamd_kv_judy_steal (c, elt);
+ if ((elt->flags & KV_ELT_DIRTY) != 0) {
+ elt->flags |= KV_ELT_NEED_FREE;
+ }
+ else {
+ /* Free it by self */
+ g_slice_free1 (ELT_SIZE (elt), elt);
+ }
+ keylen = strlen (key);
+ elt = g_slice_alloc0 (sizeof (struct rspamd_kv_element) + len + keylen + 1);
+ elt->age = time (NULL);
+ elt->keylen = keylen;
+ elt->size = len;
+ elt->hash = rspamd_strcase_hash (key);
+ memcpy (elt->data, key, keylen + 1);
+ memcpy (ELT_DATA (elt), value, len);
+ JHSI (pelt, cache->judy, ELT_KEY (elt), elt->keylen);
+ *pelt = elt;
+ }
+
+ return elt;
+}
+
+/**
+ * Replace an element inside cache
+ */
+static gboolean
+rspamd_kv_judy_replace (struct rspamd_kv_cache *c, gpointer key, struct rspamd_kv_element *elt)
+{
+ struct rspamd_kv_judy_cache *cache = (struct rspamd_kv_judy_cache *)c;
+ struct rspamd_kv_element *oldelt, **pelt;
+
+ if ((oldelt = rspamd_kv_judy_lookup (c, key)) != NULL) {
+ rspamd_kv_judy_steal (c, elt);
+
+ if ((oldelt->flags & KV_ELT_DIRTY) != 0) {
+ oldelt->flags |= KV_ELT_NEED_FREE;
+ }
+ else {
+ /* Free it by self */
+ g_slice_free1 (ELT_SIZE (oldelt), oldelt);
+ }
+ JHSI (pelt, cache->judy, ELT_KEY (elt), elt->keylen);
+ *pelt = elt;
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+/**
+ * Destroy the whole cache
+ */
+static void
+rspamd_kv_judy_destroy (struct rspamd_kv_cache *c)
+{
+ struct rspamd_kv_judy_cache *cache = (struct rspamd_kv_judy_cache *)c;
+ glong bytes;
+
+ JHSFA (bytes, cache->judy);
+ g_slice_free1 (sizeof (struct rspamd_kv_judy_cache), cache);
+}
+
+/**
+ * Judy tree
+ */
+struct rspamd_kv_cache*
+rspamd_kv_judy_new (void)
+{
+ struct rspamd_kv_judy_cache *new;
+
+ new = g_slice_alloc (sizeof (struct rspamd_kv_judy_cache));
+ new->judy = NULL;
+ new->init_func = NULL;
+ new->insert_func = rspamd_kv_judy_insert;
+ new->lookup_func = rspamd_kv_judy_lookup;
+ new->replace_func = rspamd_kv_judy_replace;
+ new->delete_func = rspamd_kv_judy_delete;
+ new->steal_func = rspamd_kv_judy_steal;
+ new->destroy_func = rspamd_kv_judy_destroy;
+
+ return (struct rspamd_kv_cache *)new;
+}
+#endif
diff --git a/src/kvstorage.h b/src/kvstorage.h
index e4ad31cc1..5f48dbf39 100644
--- a/src/kvstorage.h
+++ b/src/kvstorage.h
@@ -54,7 +54,7 @@ typedef void (*backend_destroy)(struct rspamd_kv_backend *backend);
typedef void (*expire_init)(struct rspamd_kv_expire *expire);
typedef void (*expire_insert)(struct rspamd_kv_expire *expire, struct rspamd_kv_element *elt);
typedef void (*expire_delete)(struct rspamd_kv_expire *expire, struct rspamd_kv_element *elt);
-typedef gboolean (*expire_step)(struct rspamd_kv_expire *expire, struct rspamd_kv_storage *storage, time_t now);
+typedef gboolean (*expire_step)(struct rspamd_kv_expire *expire, struct rspamd_kv_storage *storage, time_t now, gboolean forced);
typedef void (*expire_destroy)(struct rspamd_kv_expire *expire);
@@ -164,7 +164,7 @@ gboolean rspamd_kv_storage_get_array (struct rspamd_kv_storage *storage, gpointe
/**
* LRU expire
*/
-struct rspamd_kv_expire* rspamd_lru_expire_new (guint queues);
+struct rspamd_kv_expire* rspamd_lru_expire_new ();
/**
* Ordinary hash
@@ -176,5 +176,11 @@ struct rspamd_kv_cache* rspamd_kv_hash_new (void);
*/
struct rspamd_kv_cache* rspamd_kv_radix_new (void);
+#ifdef WITH_JUDY
+/**
+ * Judy tree
+ */
+struct rspamd_kv_cache* rspamd_kv_judy_new (void);
+#endif
#endif /* KVSTORAGE_H_ */
diff --git a/src/kvstorage_config.c b/src/kvstorage_config.c
index 57a23d6ab..8097078a4 100644
--- a/src/kvstorage_config.c
+++ b/src/kvstorage_config.c
@@ -31,7 +31,6 @@
#include "kvstorage_sqlite.h"
#endif
-#define LRU_QUEUES 32
/* Global hash of storages indexed by id */
GHashTable *storages = NULL;
@@ -92,6 +91,14 @@ kvstorage_init_callback (const gpointer key, const gpointer value, gpointer unus
case KVSTORAGE_TYPE_CACHE_RADIX:
cache = rspamd_kv_radix_new ();
break;
+#ifdef WITH_JUDY
+ case KVSTORAGE_TYPE_CACHE_JUDY:
+ cache = rspamd_kv_judy_new ();
+ break;
+#endif
+ default:
+ msg_err ("unknown cache type, internal error");
+ return;
}
switch (kconf->backend.type) {
@@ -113,7 +120,7 @@ kvstorage_init_callback (const gpointer key, const gpointer value, gpointer unus
switch (kconf->expire.type) {
case KVSTORAGE_TYPE_EXPIRE_LRU:
- expire = rspamd_lru_expire_new (LRU_QUEUES);
+ expire = rspamd_lru_expire_new ();
break;
}
@@ -360,6 +367,11 @@ void kvstorage_xml_text (GMarkupParseContext *context,
else if (g_ascii_strncasecmp (text, "radix", MIN (text_len, sizeof ("radix") - 1)) == 0) {
kv_parser->current_storage->cache.type = KVSTORAGE_TYPE_CACHE_RADIX;
}
+#ifdef WITH_JUDY
+ else if (g_ascii_strncasecmp (text, "judy", MIN (text_len, sizeof ("judy") - 1)) == 0) {
+ kv_parser->current_storage->cache.type = KVSTORAGE_TYPE_CACHE_JUDY;
+ }
+#endif
else {
if (*error == NULL) {
*error = g_error_new (xml_error_quark (), XML_EXTRA_ELEMENT, "invalid cache type: %*s", (int)text_len, text);
diff --git a/src/kvstorage_config.h b/src/kvstorage_config.h
index 7a3553f00..74adc0642 100644
--- a/src/kvstorage_config.h
+++ b/src/kvstorage_config.h
@@ -31,7 +31,11 @@
/* Type of kvstorage cache */
enum kvstorage_cache_type {
KVSTORAGE_TYPE_CACHE_HASH,
- KVSTORAGE_TYPE_CACHE_RADIX
+ KVSTORAGE_TYPE_CACHE_RADIX,
+#ifdef WITH_JUDY
+ KVSTORAGE_TYPE_CACHE_JUDY,
+#endif
+ KVSTORAGE_TYPE_MAX = 255
};
/* Type of kvstorage backend */