]> source.dussan.org Git - rspamd.git/commitdiff
* Add judy storage for fast caching.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 7 Nov 2011 15:15:54 +0000 (18:15 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 7 Nov 2011 15:15:54 +0000 (18:15 +0300)
Fix LRU expiration.

src/kvstorage.c
src/kvstorage.h
src/kvstorage_config.c
src/kvstorage_config.h

index e89c8e1c2f25197c38db54480791d8ace7ef1aaa..6b32603432ec4f49b3ec7b27a880c6a0b31f9cc8 100644 (file)
@@ -25,6 +25,9 @@
 #include "kvstorage.h"
 #include "main.h"
 #include "radix.h"
+#ifdef WITH_JUDY
+#include <Judy.h>
+#endif
 
 #define MAX_EXPIRE_STEPS 10
 
@@ -92,7 +95,7 @@ rspamd_kv_storage_insert_internal (struct rspamd_kv_storage *storage, gpointer k
                /* Now check limits */
                while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) {
                        if (storage->expire) {
-                               storage->expire->step_func (storage->expire, storage, time (NULL));
+                               storage->expire->step_func (storage->expire, storage, time (NULL), steps);
                        }
                        else {
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -145,7 +148,7 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key,
                /* Now check limits */
                while (storage->memory + len > storage->max_memory || storage->elts >= storage->max_elts) {
                        if (storage->expire) {
-                               storage->expire->step_func (storage->expire, storage, time (NULL));
+                               storage->expire->step_func (storage->expire, storage, time (NULL), steps);
                        }
                        else {
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -216,7 +219,7 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, stru
                /* Now check limits */
                while (storage->memory + elt->size > storage->max_memory) {
                        if (storage->expire) {
-                               storage->expire->step_func (storage->expire, storage, time (NULL));
+                               storage->expire->step_func (storage->expire, storage, time (NULL), steps);
                        }
                        else {
                                msg_warn ("<%s>: storage is full and no expire function is defined", storage->name);
@@ -423,9 +426,7 @@ struct rspamd_kv_lru_expire {
        expire_delete delete_func;                                      /*< this callback is called when an element is deleted */
        expire_destroy destroy_func;                            /*< this callback is used for destroying all elements inside expire */
 
-       guint queues;
-       TAILQ_HEAD (eltq, rspamd_kv_element) *heads;
-       guint *heads_elts;
+       TAILQ_HEAD (eltq, rspamd_kv_element) head;
 };
 
 /**
@@ -435,12 +436,9 @@ static void
 rspamd_lru_insert (struct rspamd_kv_expire *e, struct rspamd_kv_element *elt)
 {
        struct rspamd_kv_lru_expire                     *expire = (struct rspamd_kv_lru_expire *)e;
-       guint                                sel_head;
 
        /* Get a proper queue */
-       sel_head = elt->hash % expire->queues;
-       TAILQ_INSERT_HEAD (&expire->heads[sel_head], elt, entry);
-       expire->heads_elts[sel_head] ++;
+       TAILQ_INSERT_TAIL (&expire->head, elt, entry);
 }
 /**
  * Delete an element from expire queue
@@ -449,59 +447,53 @@ static void
 rspamd_lru_delete (struct rspamd_kv_expire *e, struct rspamd_kv_element *elt)
 {
        struct rspamd_kv_lru_expire                     *expire = (struct rspamd_kv_lru_expire *)e;
-       guint                                sel_head;
 
-       /* Get a proper queue */
-       sel_head = elt->hash % expire->queues;
        /* Unlink element */
-       TAILQ_REMOVE (&expire->heads[sel_head], elt, entry);
-       expire->heads_elts[sel_head] --;
+       TAILQ_REMOVE (&expire->head, elt, entry);
 }
 
 /**
  * Expire elements
  */
 static gboolean
-rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *storage, time_t now)
+rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *storage, time_t now, gboolean forced)
 {
-       guint                                                            i;
        struct rspamd_kv_lru_expire                     *expire = (struct rspamd_kv_lru_expire *)e;
-       struct rspamd_kv_element            *elt, *oldest_elt, *temp;
-       time_t                               min_diff = G_MAXLONG, diff;
+       struct rspamd_kv_element            *elt, *oldest_elt = NULL, *temp;
+       time_t                               diff;
        gboolean                             res = FALSE;
 
-       for (i = 0; i < expire->queues; i ++) {
-               elt = expire->heads[i].tqh_first;
-               if (elt && (elt->flags & (KV_ELT_PERSISTENT|KV_ELT_DIRTY)) == 0) {
-                       diff = elt->expire - (now - elt->age);
-                       if (diff > 0) {
-                               /* This element is not expired */
-                               if (diff < min_diff) {
-                                       min_diff = diff;
-                                       oldest_elt = elt;
+       elt = TAILQ_FIRST (&expire->head);
+       if (elt && (forced || (elt->flags & (KV_ELT_PERSISTENT|KV_ELT_DIRTY)) == 0)) {
+               diff = elt->expire - (now - elt->age);
+               if (diff > 0) {
+                       oldest_elt = elt;
+               }
+               else {
+                       /* This element is already expired */
+                       storage->cache->steal_func (storage->cache, elt);
+                       /* Free memory */
+                       g_slice_free1 (ELT_SIZE (elt), elt);
+                       storage->memory -= ELT_SIZE (oldest_elt);
+                       storage->elts --;
+                       TAILQ_REMOVE (&expire->head, elt, entry);
+                       res = TRUE;
+                       /* Check other elements in this queue */
+                       TAILQ_FOREACH_SAFE (elt, &expire->head, entry, temp) {
+                               if ((elt->flags & (KV_ELT_PERSISTENT|KV_ELT_DIRTY)) != 0 || elt->expire < (now - elt->age)) {
+                                       break;
                                }
-                       }
-                       else {
-                               /* This element is already expired */
+                               storage->memory -= ELT_SIZE (elt);
+                               storage->elts --;
                                storage->cache->steal_func (storage->cache, elt);
                                /* Free memory */
                                g_slice_free1 (ELT_SIZE (elt), elt);
-                               res = TRUE;
-                               /* Check other elements in this queue */
-                               TAILQ_FOREACH_SAFE (elt, &expire->heads[i], entry, temp) {
-                                       if ((elt->flags & (KV_ELT_PERSISTENT|KV_ELT_DIRTY)) != 0 || elt->expire < (now - elt->age)) {
-                                               break;
-                                       }
-                                       storage->cache->steal_func (storage->cache, elt);
-                                       /* Free memory */
-                                       g_slice_free1 (ELT_SIZE (elt), elt);
-                               }
-                               break;
+                               TAILQ_REMOVE (&expire->head, elt, entry);
                        }
                }
        }
 
-       if (!res) {
+       if (!res && oldest_elt != NULL) {
                storage->memory -= ELT_SIZE (oldest_elt);
                storage->elts --;
                storage->cache->steal_func (storage->cache, oldest_elt);
@@ -512,6 +504,7 @@ rspamd_lru_expire_step (struct rspamd_kv_expire *e, struct rspamd_kv_storage *st
                else {
                        g_slice_free1 (ELT_SIZE (oldest_elt), oldest_elt);
                }
+               TAILQ_REMOVE (&expire->head, oldest_elt, entry);
        }
 
        return TRUE;
@@ -525,8 +518,6 @@ rspamd_lru_destroy (struct rspamd_kv_expire *e)
 {
        struct rspamd_kv_lru_expire                     *expire = (struct rspamd_kv_lru_expire *)e;
 
-       g_slice_free1 (sizeof (struct eltq) * expire->queues, expire->heads);
-       g_slice_free1 (sizeof (guint) * expire->queues, expire->heads_elts);
        g_slice_free1 (sizeof (struct rspamd_kv_lru_expire), expire);
 }
 
@@ -534,19 +525,12 @@ rspamd_lru_destroy (struct rspamd_kv_expire *e)
  * Create new LRU cache
  */
 struct rspamd_kv_expire*
-rspamd_lru_expire_new (guint queues)
+rspamd_lru_expire_new ()
 {
        struct rspamd_kv_lru_expire                     *new;
-       guint                                                            i;
 
        new = g_slice_alloc (sizeof (struct rspamd_kv_lru_expire));
-       new->queues = queues;
-       new->heads = g_slice_alloc (sizeof (struct eltq) * queues);
-       new->heads_elts = g_slice_alloc0 (sizeof (guint) * queues);
-
-       for (i = 0; i < queues; i ++) {
-               TAILQ_INIT (&new->heads[i]);
-       }
+       TAILQ_INIT (&new->head);
 
        /* Set callbacks */
        new->init_func = NULL;
@@ -878,3 +862,173 @@ rspamd_kv_radix_new (void)
 
        return (struct rspamd_kv_cache *)new;
 }
+
+
+#ifdef WITH_JUDY
+/*
+ * KV cache hash table
+ */
+struct rspamd_kv_judy_cache {
+       cache_init init_func;                                           /*< this callback is called on kv storage initialization */
+       cache_insert insert_func;                                       /*< this callback is called when element is inserted */
+       cache_replace replace_func;                                     /*< this callback is called when element is replace */
+       cache_lookup lookup_func;                                       /*< this callback is used for lookup of element */
+       cache_delete delete_func;                                       /*< this callback is called when an element is deleted */
+       cache_steal steal_func;                                         /*< this callback is used to replace duplicates in cache */
+       cache_destroy destroy_func;                                     /*< this callback is used for destroying all elements inside cache */
+       Pvoid_t judy;
+};
+
+
+/**
+ * Lookup an item inside judy
+ */
+static struct rspamd_kv_element*
+rspamd_kv_judy_lookup (struct rspamd_kv_cache *c, gpointer key)
+{
+       struct rspamd_kv_judy_cache                     *cache = (struct rspamd_kv_judy_cache *)c;
+       struct rspamd_kv_element                        *elt = NULL, **pelt;
+
+       JHSG (pelt, cache->judy, key, strlen (key));
+       if (pelt != NULL) {
+               elt = *pelt;
+       }
+       return elt;
+}
+
+/**
+ * Delete an element from cache
+ */
+static struct rspamd_kv_element *
+rspamd_kv_judy_delete (struct rspamd_kv_cache *c, gpointer key)
+{
+       struct rspamd_kv_judy_cache                     *cache = (struct rspamd_kv_judy_cache *)c;
+       struct rspamd_kv_element            *elt;
+       gint                                                             rc;
+
+       elt = rspamd_kv_judy_lookup (c, key);
+       if (elt) {
+               JHSD (rc, cache->judy, ELT_KEY (elt), elt->keylen);
+       }
+       return elt;
+}
+
+/**
+ * Steal an element from cache
+ */
+static void
+rspamd_kv_judy_steal (struct rspamd_kv_cache *c, struct rspamd_kv_element *elt)
+{
+       struct rspamd_kv_judy_cache                     *cache = (struct rspamd_kv_judy_cache *)c;
+       gint                                                             rc;
+
+       JHSD (rc, cache->judy, ELT_KEY (elt), elt->keylen);
+}
+
+/**
+ * Insert an element inside cache
+ */
+static struct rspamd_kv_element*
+rspamd_kv_judy_insert (struct rspamd_kv_cache *c, gpointer key, gpointer value, gsize len)
+{
+       struct rspamd_kv_element                        *elt, **pelt;
+       struct rspamd_kv_judy_cache                     *cache = (struct rspamd_kv_judy_cache *)c;
+       guint                                                            keylen;
+
+       if ((elt = rspamd_kv_judy_lookup (c, key)) == NULL) {
+               keylen = strlen (key);
+               elt = g_slice_alloc0 (sizeof (struct rspamd_kv_element) + len + keylen + 1);
+               elt->age = time (NULL);
+               elt->keylen = keylen;
+               elt->size = len;
+               elt->hash = rspamd_strcase_hash (key);
+               memcpy (elt->data, key, keylen + 1);
+               memcpy (ELT_DATA (elt), value, len);
+               JHSI (pelt, cache->judy, ELT_KEY (elt), elt->keylen);
+               *pelt = elt;
+       }
+       else {
+               rspamd_kv_judy_steal (c, elt);
+               if ((elt->flags & KV_ELT_DIRTY) != 0) {
+                       elt->flags |= KV_ELT_NEED_FREE;
+               }
+               else {
+                       /* Free it by self */
+                       g_slice_free1 (ELT_SIZE (elt), elt);
+               }
+               keylen = strlen (key);
+               elt = g_slice_alloc0 (sizeof (struct rspamd_kv_element) + len + keylen + 1);
+               elt->age = time (NULL);
+               elt->keylen = keylen;
+               elt->size = len;
+               elt->hash = rspamd_strcase_hash (key);
+               memcpy (elt->data, key, keylen + 1);
+               memcpy (ELT_DATA (elt), value, len);
+               JHSI (pelt, cache->judy, ELT_KEY (elt), elt->keylen);
+               *pelt = elt;
+       }
+
+       return elt;
+}
+
+/**
+ * Replace an element inside cache
+ */
+static gboolean
+rspamd_kv_judy_replace (struct rspamd_kv_cache *c, gpointer key, struct rspamd_kv_element *elt)
+{
+       struct rspamd_kv_judy_cache                     *cache = (struct rspamd_kv_judy_cache *)c;
+       struct rspamd_kv_element                        *oldelt, **pelt;
+
+       if ((oldelt = rspamd_kv_judy_lookup (c, key)) != NULL) {
+               rspamd_kv_judy_steal (c, elt);
+
+               if ((oldelt->flags & KV_ELT_DIRTY) != 0) {
+                       oldelt->flags |= KV_ELT_NEED_FREE;
+               }
+               else {
+                       /* Free it by self */
+                       g_slice_free1 (ELT_SIZE (oldelt), oldelt);
+               }
+               JHSI (pelt, cache->judy, ELT_KEY (elt), elt->keylen);
+               *pelt = elt;
+               return TRUE;
+       }
+
+       return FALSE;
+}
+
+/**
+ * Destroy the whole cache
+ */
+static void
+rspamd_kv_judy_destroy (struct rspamd_kv_cache *c)
+{
+       struct rspamd_kv_judy_cache                     *cache = (struct rspamd_kv_judy_cache *)c;
+       glong                                                            bytes;
+
+       JHSFA (bytes, cache->judy);
+       g_slice_free1 (sizeof (struct rspamd_kv_judy_cache), cache);
+}
+
+/**
+ * Judy tree
+ */
+struct rspamd_kv_cache*
+rspamd_kv_judy_new (void)
+{
+       struct rspamd_kv_judy_cache                     *new;
+
+       new = g_slice_alloc (sizeof (struct rspamd_kv_judy_cache));
+       new->judy = NULL;
+       new->init_func = NULL;
+       new->insert_func = rspamd_kv_judy_insert;
+       new->lookup_func = rspamd_kv_judy_lookup;
+       new->replace_func = rspamd_kv_judy_replace;
+       new->delete_func = rspamd_kv_judy_delete;
+       new->steal_func = rspamd_kv_judy_steal;
+       new->destroy_func = rspamd_kv_judy_destroy;
+
+       return (struct rspamd_kv_cache *)new;
+}
+#endif
index e4ad31cc1aa8d8c56b96dbce8a5cf91260a764d6..5f48dbf39bc6c9fffeb8204f65fdfcb2caaed88e 100644 (file)
@@ -54,7 +54,7 @@ typedef void (*backend_destroy)(struct rspamd_kv_backend *backend);
 typedef void (*expire_init)(struct rspamd_kv_expire *expire);
 typedef void (*expire_insert)(struct rspamd_kv_expire *expire, struct rspamd_kv_element *elt);
 typedef void (*expire_delete)(struct rspamd_kv_expire *expire, struct rspamd_kv_element *elt);
-typedef gboolean (*expire_step)(struct rspamd_kv_expire *expire, struct rspamd_kv_storage *storage, time_t now);
+typedef gboolean (*expire_step)(struct rspamd_kv_expire *expire, struct rspamd_kv_storage *storage, time_t now, gboolean forced);
 typedef void (*expire_destroy)(struct rspamd_kv_expire *expire);
 
 
@@ -164,7 +164,7 @@ gboolean rspamd_kv_storage_get_array (struct rspamd_kv_storage *storage, gpointe
 /**
  * LRU expire
  */
-struct rspamd_kv_expire* rspamd_lru_expire_new (guint queues);
+struct rspamd_kv_expire* rspamd_lru_expire_new ();
 
 /**
  * Ordinary hash
@@ -176,5 +176,11 @@ struct rspamd_kv_cache* rspamd_kv_hash_new (void);
  */
 struct rspamd_kv_cache* rspamd_kv_radix_new (void);
 
+#ifdef WITH_JUDY
+/**
+ * Judy tree
+ */
+struct rspamd_kv_cache* rspamd_kv_judy_new (void);
+#endif
 
 #endif /* KVSTORAGE_H_ */
index 57a23d6abb17f8f1c78994aeddadb13576c64d5a..8097078a4e7fd3b8dffe5b5eedad7b80ffd152eb 100644 (file)
@@ -31,7 +31,6 @@
 #include "kvstorage_sqlite.h"
 #endif
 
-#define LRU_QUEUES 32
 
 /* Global hash of storages indexed by id */
 GHashTable *storages = NULL;
@@ -92,6 +91,14 @@ kvstorage_init_callback (const gpointer key, const gpointer value, gpointer unus
        case KVSTORAGE_TYPE_CACHE_RADIX:
                cache = rspamd_kv_radix_new ();
                break;
+#ifdef WITH_JUDY
+       case KVSTORAGE_TYPE_CACHE_JUDY:
+               cache = rspamd_kv_judy_new ();
+               break;
+#endif
+       default:
+               msg_err ("unknown cache type, internal error");
+               return;
        }
 
        switch (kconf->backend.type) {
@@ -113,7 +120,7 @@ kvstorage_init_callback (const gpointer key, const gpointer value, gpointer unus
 
        switch (kconf->expire.type) {
        case KVSTORAGE_TYPE_EXPIRE_LRU:
-               expire = rspamd_lru_expire_new (LRU_QUEUES);
+               expire = rspamd_lru_expire_new ();
                break;
        }
 
@@ -360,6 +367,11 @@ void kvstorage_xml_text       (GMarkupParseContext         *context,
                else if (g_ascii_strncasecmp (text, "radix", MIN (text_len, sizeof ("radix") - 1)) == 0) {
                        kv_parser->current_storage->cache.type = KVSTORAGE_TYPE_CACHE_RADIX;
                }
+#ifdef WITH_JUDY
+               else if (g_ascii_strncasecmp (text, "judy", MIN (text_len, sizeof ("judy") - 1)) == 0) {
+                       kv_parser->current_storage->cache.type = KVSTORAGE_TYPE_CACHE_JUDY;
+               }
+#endif
                else {
                        if (*error == NULL) {
                                *error = g_error_new (xml_error_quark (), XML_EXTRA_ELEMENT, "invalid cache type: %*s", (int)text_len, text);
index 7a3553f009eb608afd2600eeadc3ef802262c6b0..74adc0642ac666534cf5ec40b1ad92a72ee09cb1 100644 (file)
 /* Type of kvstorage cache */
 enum kvstorage_cache_type {
        KVSTORAGE_TYPE_CACHE_HASH,
-       KVSTORAGE_TYPE_CACHE_RADIX
+       KVSTORAGE_TYPE_CACHE_RADIX,
+#ifdef WITH_JUDY
+       KVSTORAGE_TYPE_CACHE_JUDY,
+#endif
+       KVSTORAGE_TYPE_MAX = 255
 };
 
 /* Type of kvstorage backend */