diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-11-01 19:48:08 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-11-01 19:48:08 +0300 |
commit | 7770c0c2ff70150fe33358a461fadf91e01ae23f (patch) | |
tree | 74ec845002dbd64b0fafed52e1b228616ffb71dc /src | |
parent | d194a7d577985268ff5f7f29379c90d694a8f230 (diff) | |
download | rspamd-7770c0c2ff70150fe33358a461fadf91e01ae23f.tar.gz rspamd-7770c0c2ff70150fe33358a461fadf91e01ae23f.zip |
* Add initial support of bdb backend
Diffstat (limited to 'src')
-rw-r--r-- | src/kvstorage.c | 19 | ||||
-rw-r--r-- | src/kvstorage.h | 2 | ||||
-rw-r--r-- | src/kvstorage_bdb.c | 376 | ||||
-rw-r--r-- | src/kvstorage_bdb.h | 38 | ||||
-rw-r--r-- | src/kvstorage_config.c | 32 | ||||
-rw-r--r-- | src/kvstorage_config.h | 7 |
6 files changed, 462 insertions, 12 deletions
diff --git a/src/kvstorage.c b/src/kvstorage.c index e628b93ad..ef774dd7d 100644 --- a/src/kvstorage.c +++ b/src/kvstorage.c @@ -227,17 +227,21 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, stru struct rspamd_kv_element* rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, time_t now) { - struct rspamd_kv_element *elt = NULL; + struct rspamd_kv_element *elt = NULL, *belt; /* First try to look at cache */ elt = storage->cache->lookup_func (storage->cache, key); /* Next look at the backend */ if (elt == NULL && storage->backend) { - elt = storage->backend->lookup_func (storage->backend, key); - if (elt) { + belt = storage->backend->lookup_func (storage->backend, key); + if (belt) { /* Put this element into cache */ - rspamd_kv_storage_insert_internal (storage, elt->key, elt->data, elt->size, elt->flags, elt->expire, &elt); + rspamd_kv_storage_insert_internal (storage, belt->key, belt->data, belt->size, belt->flags, + belt->expire, &elt); + if ((belt->flags & KV_ELT_DIRTY) == 0) { + g_free (belt); + } } } @@ -263,12 +267,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key) /* Now delete from backend */ if (storage->backend) { - if (elt == NULL) { - elt = storage->backend->delete_func (storage->backend, key); - } - else { - storage->backend->delete_func (storage->backend, key); - } + storage->backend->delete_func (storage->backend, key); } /* Notify expire */ if (elt) { diff --git a/src/kvstorage.h b/src/kvstorage.h index 4c2f0ef5c..00ff20e9a 100644 --- a/src/kvstorage.h +++ b/src/kvstorage.h @@ -45,7 +45,7 @@ typedef void (*backend_init)(struct rspamd_kv_backend *backend); typedef gboolean (*backend_insert)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt); typedef gboolean (*backend_replace)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt); typedef struct rspamd_kv_element* (*backend_lookup)(struct rspamd_kv_backend *backend, gpointer key); -typedef struct rspamd_kv_element* (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key); +typedef void (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key); typedef void (*backend_destroy)(struct rspamd_kv_backend *backend); /* Callbacks for expire */ diff --git a/src/kvstorage_bdb.c b/src/kvstorage_bdb.c new file mode 100644 index 000000000..4524cb7bc --- /dev/null +++ b/src/kvstorage_bdb.c @@ -0,0 +1,376 @@ +/* Copyright (c) 2010, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "config.h" +#include "kvstorage.h" +#include "kvstorage_bdb.h" +#include "util.h" +#include <db.h> + +struct bdb_op { + struct rspamd_kv_element *elt; + enum { + BDB_OP_INSERT, + BDB_OP_DELETE, + BDB_OP_REPLACE + } op; +}; + +/* Main bdb structure */ +struct rspamd_bdb_backend { + backend_init init_func; /*< this callback is called on kv storage initialization */ + backend_insert insert_func; /*< this callback is called when element is inserted */ + backend_replace replace_func; /*< this callback is called when element is replaced */ + backend_lookup lookup_func; /*< this callback is used for lookup of element */ + backend_delete delete_func; /*< this callback is called when an element is deleted */ + backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */ + DB_ENV *envp; /*< db environment */ + DB *dbp; /*< db pointer */ + gchar *filename; + gchar *dirname; + guint sync_ops; + GQueue *ops_queue; + GHashTable *ops_hash; + gboolean initialized; +}; + +/* Process single bdb operation */ +static gboolean +bdb_process_single_op (struct rspamd_bdb_backend *db, DB_TXN *txn, DBC *cursorp, struct bdb_op *op) +{ + DBT db_key, db_data; + + memset (&db_key, 0, sizeof(DBT)); + memset (&db_data, 0, sizeof(DBT)); + db_key.size = strlen (op->elt->key); + db_key.data = op->elt->key; + db_data.size = op->elt->size; + db_data.data = op->elt; + + switch (op->op) { + case BDB_OP_INSERT: + case BDB_OP_REPLACE: + if (cursorp->put (cursorp, &db_key, &db_data, 0) != 0) { + return FALSE; + } + break; + case BDB_OP_DELETE: + db_data.flags = DB_DBT_USERMEM; + /* Set cursor */ + if (cursorp->get (cursorp, &db_key, &db_data, 0) != 0) { + return FALSE; + } + /* Del record */ + if (cursorp->del (cursorp, 0) != 0) { + return FALSE; + } + break; + } + + return TRUE; +} + +/* Process operations queue */ +static gboolean +bdb_process_queue (struct rspamd_bdb_backend *db) +{ + struct bdb_op *op; + DBC *cursorp; + DB_TXN *txn = NULL; + GList *cur, *tmp; + + /* Start transaction */ + if (db->envp->txn_begin (db->envp, NULL, &txn, 0) != 0) { + return FALSE; + } + if (db->dbp->cursor (db->dbp, txn, &cursorp, 0) != 0) { + txn->abort (txn); + return FALSE; + } + + cur = db->ops_queue->head; + while (cur) { + op = cur->data; + if (! bdb_process_single_op (db, txn, cursorp, op)) { + txn->abort (txn); + return FALSE; + } + cur = g_list_next (cur); + } + + /* Commit transaction */ + cursorp->close (cursorp); + if (txn->commit (txn, 0) != 0) { + return FALSE; + } + + /* Clean the queue */ + cur = db->ops_queue->head; + while (cur) { + op = cur->data; + tmp = cur; + g_hash_table_remove (db->ops_hash, op->elt->key); + if (op->op == BDB_OP_DELETE) { + /* Also clean memory */ + g_free (op->elt); + } + cur = g_list_next (cur); + g_queue_delete_link (db->ops_queue, tmp); + g_slice_free1 (sizeof (struct bdb_op), op); + } + + return TRUE; + +} + +/* Backend callbacks */ +static void +rspamd_bdb_init (struct rspamd_kv_backend *backend) +{ + struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend; + guint32 flags; + + if (db_env_create (&db->envp, 0) != 0) { + /* Cannot create environment */ + goto err; + } + + flags = DB_CREATE | /* Create the environment if it does not already exist. */ + DB_INIT_TXN | /* Initialize transactions */ + DB_INIT_LOCK | /* Initialize locking. */ + DB_THREAD; /* Use threads */ + + if (db->envp->open (db->envp, db->dirname, flags, 0) != 0) { + /* Cannot open environment */ + goto err; + } + /* + * Configure db to perform deadlock detection internally, and to + * choose the transaction that has performed the least amount of + * writing to break the deadlock in the event that one is detected. + */ + db->envp->set_lk_detect(db->envp, DB_LOCK_MINWRITE); + + flags = DB_CREATE | DB_THREAD; + /* Create and open db pointer */ + if (db_create (&db->dbp, db->envp, 0) != 0) { + goto err; + } + + if (db->dbp->open (db->dbp, NULL, db->filename, NULL, DB_BTREE, flags, 0) != 0) { + goto err; + } + + db->initialized = TRUE; + + return; +err: + if (db->dbp != NULL) { + db->dbp->close (db->dbp, 0); + } + if (db->envp != NULL) { + db->envp->close (db->envp, 0); + } +} + +static gboolean +rspamd_bdb_insert (struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt) +{ + struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend; + struct bdb_op *op; + + if (!db->initialized) { + return FALSE; + } + + op = g_slice_alloc (sizeof (struct bdb_op)); + op->op = BDB_OP_INSERT; + op->elt = elt; + elt->flags |= KV_ELT_DIRTY; + + g_queue_push_head (db->ops_queue, elt); + g_hash_table_insert (db->ops_hash, key, elt); + + if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { + return bdb_process_queue (db); + } + + return TRUE; +} + +static gboolean +rspamd_bdb_replace (struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt) +{ + struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend; + struct bdb_op *op; + + if (!db->initialized) { + return FALSE; + } + + op = g_slice_alloc (sizeof (struct bdb_op)); + op->op = BDB_OP_REPLACE; + op->elt = elt; + elt->flags |= KV_ELT_DIRTY; + + g_queue_push_head (db->ops_queue, elt); + g_hash_table_insert (db->ops_hash, key, elt); + + if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { + return bdb_process_queue (db); + } + + return TRUE; +} + +static struct rspamd_kv_element* +rspamd_bdb_lookup (struct rspamd_kv_backend *backend, gpointer key) +{ + struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend; + struct bdb_op *op; + DBC *cursorp; + DBT db_key, db_data; + struct rspamd_kv_element *elt = NULL; + + if (!db->initialized) { + return NULL; + } + /* First search in ops queue */ + if ((op = g_hash_table_lookup (db->ops_hash, key)) != NULL) { + if (op->op == BDB_OP_DELETE) { + /* To delete, so assume it as not found */ + return NULL; + } + return op->elt; + } + + /* Now try to search in bdb */ + if (db->dbp->cursor (db->dbp, NULL, &cursorp, 0) != 0) { + return NULL; + } + memset (&db_key, 0, sizeof(DBT)); + memset (&db_data, 0, sizeof(DBT)); + db_key.size = strlen (key); + db_key.data = key; + db_data.flags = DB_DBT_MALLOC; + + if (cursorp->get (cursorp, &db_key, &db_data, 0) == 0) { + elt = db_data.data; + } + + cursorp->close (cursorp); + return elt; +} + +static void +rspamd_bdb_delete (struct rspamd_kv_backend *backend, gpointer key) +{ + struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend; + struct bdb_op *op; + struct rspamd_kv_element *elt; + + if (!db->initialized) { + return; + } + + if ((op = g_hash_table_lookup (db->ops_hash, key)) != NULL) { + return; + } + + elt = rspamd_bdb_lookup (backend, key); + if (elt == NULL) { + return; + } + op = g_slice_alloc (sizeof (struct bdb_op)); + op->op = BDB_OP_DELETE; + op->elt = elt; + elt->flags |= KV_ELT_DIRTY; + + g_queue_push_head (db->ops_queue, elt); + g_hash_table_insert (db->ops_hash, key, elt); + + if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { + bdb_process_queue (db); + } + + return; +} + +static void +rspamd_bdb_destroy (struct rspamd_kv_backend *backend) +{ + struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend; + + if (db->initialized) { + if (db->dbp != NULL) { + db->dbp->close (db->dbp, 0); + } + if (db->envp != NULL) { + db->envp->close (db->envp, 0); + } + g_free (db->filename); + g_free (db->dirname); + g_queue_free (db->ops_queue); + g_hash_table_unref (db->ops_hash); + g_slice_free1 (sizeof (struct rspamd_bdb_backend), db); + } +} + +/* Create new bdb backend */ +struct rspamd_kv_backend * +rspamd_kv_bdb_new (const gchar *filename, guint sync_ops) +{ + struct rspamd_bdb_backend *new; + struct stat st; + gchar *dirname; + + if (filename == NULL) { + return NULL; + } + dirname = g_path_get_dirname (filename); + if (dirname == NULL || stat (dirname, &st) == -1 || !S_ISDIR (st.st_mode)) { + /* Inaccessible path */ + if (dirname != NULL) { + g_free (dirname); + } + return NULL; + } + + new = g_slice_alloc0 (sizeof (struct rspamd_bdb_backend)); + new->dirname = dirname; + new->filename = g_strdup (filename); + new->sync_ops = sync_ops; + new->ops_queue = g_queue_new (); + new->ops_hash = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal); + + /* Init callbacks */ + new->init_func = rspamd_bdb_init; + new->insert_func = rspamd_bdb_insert; + new->lookup_func = rspamd_bdb_lookup; + new->delete_func = rspamd_bdb_delete; + new->replace_func = rspamd_bdb_replace; + new->destroy_func = rspamd_bdb_destroy; + + return (struct rspamd_kv_backend *)new; +} diff --git a/src/kvstorage_bdb.h b/src/kvstorage_bdb.h new file mode 100644 index 000000000..c6dd925b5 --- /dev/null +++ b/src/kvstorage_bdb.h @@ -0,0 +1,38 @@ +/* Copyright (c) 2010, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#ifndef KVSTORAGE_BDB_H_ +#define KVSTORAGE_BDB_H_ + +#include "config.h" +#include "kvstorage.h" + +#ifdef WITH_DB + +/* Create new bdb backend */ +struct rspamd_kv_backend* rspamd_kv_bdb_new (const gchar *filename, guint sync_ops); + +#endif + +#endif /* KVSTORAGE_BDB_H_ */ diff --git a/src/kvstorage_config.c b/src/kvstorage_config.c index bbffb4799..2bf905cae 100644 --- a/src/kvstorage_config.c +++ b/src/kvstorage_config.c @@ -24,6 +24,9 @@ #include "kvstorage_config.h" #include "main.h" #include "cfg_xml.h" +#ifdef WITH_DB +#include "kvstorage_bdb.h" +#endif #define LRU_QUEUES 32 @@ -44,6 +47,8 @@ struct kvstorage_config_parser { KVSTORAGE_STATE_CACHE_MAX_ELTS, KVSTORAGE_STATE_CACHE_MAX_MEM, KVSTORAGE_STATE_BACKEND_TYPE, + KVSTORAGE_STATE_BACKEND_FILENAME, + KVSTORAGE_STATE_BACKEND_SYNC_OPS, KVSTORAGE_STATE_EXPIRE_TYPE, KVSTORAGE_STATE_ERROR } state; @@ -90,6 +95,11 @@ kvstorage_init_callback (const gpointer key, const gpointer value, gpointer unus case KVSTORAGE_TYPE_BACKEND_NULL: backend = NULL; break; +#ifdef WITH_DB + case KVSTORAGE_TYPE_BACKEND_BDB: + backend = rspamd_kv_bdb_new (kconf->backend.filename, kconf->backend.sync_ops); + break; +#endif } switch (kconf->expire.type) { @@ -170,6 +180,14 @@ void kvstorage_xml_start_element (GMarkupParseContext *context, kv_parser->state = KVSTORAGE_STATE_BACKEND_TYPE; kv_parser->cur_elt = "type"; } + else if (g_ascii_strcasecmp (element_name, "filename") == 0) { + kv_parser->state = KVSTORAGE_STATE_BACKEND_FILENAME; + kv_parser->cur_elt = "filename"; + } + else if (g_ascii_strcasecmp (element_name, "sync_ops") == 0) { + kv_parser->state = KVSTORAGE_STATE_BACKEND_SYNC_OPS; + kv_parser->cur_elt = "sync_ops"; + } else { if (*error == NULL) { *error = g_error_new (xml_error_quark (), XML_EXTRA_ELEMENT, "element %s is unexpected in backend definition", @@ -238,6 +256,8 @@ void kvstorage_xml_end_element (GMarkupParseContext *context, CHECK_TAG (KVSTORAGE_STATE_PARAM); break; case KVSTORAGE_STATE_BACKEND_TYPE: + case KVSTORAGE_STATE_BACKEND_FILENAME: + case KVSTORAGE_STATE_BACKEND_SYNC_OPS: CHECK_TAG (KVSTORAGE_STATE_BACKEND); break; case KVSTORAGE_STATE_EXPIRE_TYPE: @@ -342,6 +362,11 @@ void kvstorage_xml_text (GMarkupParseContext *context, if (g_ascii_strncasecmp (text, "null", MIN (text_len, sizeof ("null") - 1)) == 0) { kv_parser->current_storage->backend.type = KVSTORAGE_TYPE_BACKEND_NULL; } +#ifdef WITH_DB + else if (g_ascii_strncasecmp (text, "bdb", MIN (text_len, sizeof ("bdb") - 1)) == 0) { + kv_parser->current_storage->backend.type = KVSTORAGE_TYPE_BACKEND_BDB; + } +#endif else { if (*error == NULL) { *error = g_error_new (xml_error_quark (), XML_EXTRA_ELEMENT, "invalid backend type: %*s", (int)text_len, text); @@ -349,6 +374,13 @@ void kvstorage_xml_text (GMarkupParseContext *context, kv_parser->state = KVSTORAGE_STATE_ERROR; } break; + case KVSTORAGE_STATE_BACKEND_FILENAME: + kv_parser->current_storage->backend.filename = g_malloc (text_len + 1); + rspamd_strlcpy (kv_parser->current_storage->backend.filename, text, text_len + 1); + break; + case KVSTORAGE_STATE_BACKEND_SYNC_OPS: + kv_parser->current_storage->backend.sync_ops = parse_limit (text, text_len); + break; case KVSTORAGE_STATE_EXPIRE_TYPE: if (g_ascii_strncasecmp (text, "lru", MIN (text_len, sizeof ("lru") - 1)) == 0) { kv_parser->current_storage->expire.type = KVSTORAGE_TYPE_EXPIRE_LRU; diff --git a/src/kvstorage_config.h b/src/kvstorage_config.h index 567dd18be..ced149c4e 100644 --- a/src/kvstorage_config.h +++ b/src/kvstorage_config.h @@ -36,7 +36,10 @@ enum kvstorage_cache_type { /* Type of kvstorage backend */ enum kvstorage_backend_type { - KVSTORAGE_TYPE_BACKEND_NULL = 0 + KVSTORAGE_TYPE_BACKEND_NULL = 0, +#ifdef WITH_DB + KVSTORAGE_TYPE_BACKEND_BDB +#endif }; /* Type of kvstorage expire */ @@ -54,6 +57,8 @@ struct kvstorage_cache_config { /* Backend config */ struct kvstorage_backend_config { enum kvstorage_backend_type type; + gchar *filename; + guint sync_ops; }; |