Browse Source

* Add initial support of bdb backend

tags/0.4.5
Vsevolod Stakhov 12 years ago
parent
commit
7770c0c2ff
8 changed files with 468 additions and 14 deletions
  1. 2
    2
      CMakeLists.txt
  2. 4
    0
      lib/CMakeLists.txt
  3. 9
    10
      src/kvstorage.c
  4. 1
    1
      src/kvstorage.h
  5. 376
    0
      src/kvstorage_bdb.c
  6. 38
    0
      src/kvstorage_bdb.h
  7. 32
    0
      src/kvstorage_config.c
  8. 6
    1
      src/kvstorage_config.h

+ 2
- 2
CMakeLists.txt View File

@@ -444,7 +444,7 @@ IF(LIBJUDY_LIBRARY)
ENDIF(LIBJUDY_LIBRARY)

# Find libbd
FIND_LIBRARY(LIBDB_LIBRARY NAMES db-4 PATHS PATH_SUFFIXES lib64 lib
FIND_LIBRARY(LIBDB_LIBRARY NAMES db PATHS PATH_SUFFIXES lib64 lib
PATHS
~/Library/Frameworks
/Library/Frameworks
@@ -814,7 +814,7 @@ IF(LIBJUDY_LIBRARY)
TARGET_LINK_LIBRARIES(rspamd Judy)
ENDIF(LIBJUDY_LIBRARY)
IF(LIBDB_LIBRARY)
TARGET_LINK_LIBRARIES(rspamd db-4)
TARGET_LINK_LIBRARIES(rspamd db)
ENDIF(LIBDB_LIBRARY)
TARGET_LINK_LIBRARIES(rspamd event)
IF(HAVE_LIBEVENT2)

+ 4
- 0
lib/CMakeLists.txt View File

@@ -70,6 +70,10 @@ SET(RSPAMDLIBSRC ../src/binlog.c
../src/url.c
../src/util.c
../src/view.c)

IF(WITH_DB)
LIST(APPEND RSPAMDLIBSRC ../src/kvstorage_bdb.c)
ENDIF(WITH_DB)
ADD_LIBRARY(rspamdserver STATIC ${RSPAMDLIBSRC})
SET_TARGET_PROPERTIES(rspamdserver PROPERTIES LINKER_LANGUAGE C)

+ 9
- 10
src/kvstorage.c View File

@@ -227,17 +227,21 @@ rspamd_kv_storage_replace (struct rspamd_kv_storage *storage, gpointer key, stru
struct rspamd_kv_element*
rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, time_t now)
{
struct rspamd_kv_element *elt = NULL;
struct rspamd_kv_element *elt = NULL, *belt;

/* First try to look at cache */
elt = storage->cache->lookup_func (storage->cache, key);

/* Next look at the backend */
if (elt == NULL && storage->backend) {
elt = storage->backend->lookup_func (storage->backend, key);
if (elt) {
belt = storage->backend->lookup_func (storage->backend, key);
if (belt) {
/* Put this element into cache */
rspamd_kv_storage_insert_internal (storage, elt->key, elt->data, elt->size, elt->flags, elt->expire, &elt);
rspamd_kv_storage_insert_internal (storage, belt->key, belt->data, belt->size, belt->flags,
belt->expire, &elt);
if ((belt->flags & KV_ELT_DIRTY) == 0) {
g_free (belt);
}
}
}

@@ -263,12 +267,7 @@ rspamd_kv_storage_delete (struct rspamd_kv_storage *storage, gpointer key)

/* Now delete from backend */
if (storage->backend) {
if (elt == NULL) {
elt = storage->backend->delete_func (storage->backend, key);
}
else {
storage->backend->delete_func (storage->backend, key);
}
storage->backend->delete_func (storage->backend, key);
}
/* Notify expire */
if (elt) {

+ 1
- 1
src/kvstorage.h View File

@@ -45,7 +45,7 @@ typedef void (*backend_init)(struct rspamd_kv_backend *backend);
typedef gboolean (*backend_insert)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt);
typedef gboolean (*backend_replace)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt);
typedef struct rspamd_kv_element* (*backend_lookup)(struct rspamd_kv_backend *backend, gpointer key);
typedef struct rspamd_kv_element* (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key);
typedef void (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key);
typedef void (*backend_destroy)(struct rspamd_kv_backend *backend);

/* Callbacks for expire */

+ 376
- 0
src/kvstorage_bdb.c View File

@@ -0,0 +1,376 @@
/* Copyright (c) 2010, Vsevolod Stakhov
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/


#include "config.h"
#include "kvstorage.h"
#include "kvstorage_bdb.h"
#include "util.h"
#include <db.h>

struct bdb_op {
struct rspamd_kv_element *elt;
enum {
BDB_OP_INSERT,
BDB_OP_DELETE,
BDB_OP_REPLACE
} op;
};

/* Main bdb structure */
struct rspamd_bdb_backend {
backend_init init_func; /*< this callback is called on kv storage initialization */
backend_insert insert_func; /*< this callback is called when element is inserted */
backend_replace replace_func; /*< this callback is called when element is replaced */
backend_lookup lookup_func; /*< this callback is used for lookup of element */
backend_delete delete_func; /*< this callback is called when an element is deleted */
backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */
DB_ENV *envp; /*< db environment */
DB *dbp; /*< db pointer */
gchar *filename;
gchar *dirname;
guint sync_ops;
GQueue *ops_queue;
GHashTable *ops_hash;
gboolean initialized;
};

/* Process single bdb operation */
static gboolean
bdb_process_single_op (struct rspamd_bdb_backend *db, DB_TXN *txn, DBC *cursorp, struct bdb_op *op)
{
DBT db_key, db_data;

memset (&db_key, 0, sizeof(DBT));
memset (&db_data, 0, sizeof(DBT));
db_key.size = strlen (op->elt->key);
db_key.data = op->elt->key;
db_data.size = op->elt->size;
db_data.data = op->elt;

switch (op->op) {
case BDB_OP_INSERT:
case BDB_OP_REPLACE:
if (cursorp->put (cursorp, &db_key, &db_data, 0) != 0) {
return FALSE;
}
break;
case BDB_OP_DELETE:
db_data.flags = DB_DBT_USERMEM;
/* Set cursor */
if (cursorp->get (cursorp, &db_key, &db_data, 0) != 0) {
return FALSE;
}
/* Del record */
if (cursorp->del (cursorp, 0) != 0) {
return FALSE;
}
break;
}

return TRUE;
}

/* Process operations queue */
static gboolean
bdb_process_queue (struct rspamd_bdb_backend *db)
{
struct bdb_op *op;
DBC *cursorp;
DB_TXN *txn = NULL;
GList *cur, *tmp;

/* Start transaction */
if (db->envp->txn_begin (db->envp, NULL, &txn, 0) != 0) {
return FALSE;
}
if (db->dbp->cursor (db->dbp, txn, &cursorp, 0) != 0) {
txn->abort (txn);
return FALSE;
}

cur = db->ops_queue->head;
while (cur) {
op = cur->data;
if (! bdb_process_single_op (db, txn, cursorp, op)) {
txn->abort (txn);
return FALSE;
}
cur = g_list_next (cur);
}

/* Commit transaction */
cursorp->close (cursorp);
if (txn->commit (txn, 0) != 0) {
return FALSE;
}

/* Clean the queue */
cur = db->ops_queue->head;
while (cur) {
op = cur->data;
tmp = cur;
g_hash_table_remove (db->ops_hash, op->elt->key);
if (op->op == BDB_OP_DELETE) {
/* Also clean memory */
g_free (op->elt);
}
cur = g_list_next (cur);
g_queue_delete_link (db->ops_queue, tmp);
g_slice_free1 (sizeof (struct bdb_op), op);
}

return TRUE;

}

/* Backend callbacks */
static void
rspamd_bdb_init (struct rspamd_kv_backend *backend)
{
struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend;
guint32 flags;

if (db_env_create (&db->envp, 0) != 0) {
/* Cannot create environment */
goto err;
}

flags = DB_CREATE | /* Create the environment if it does not already exist. */
DB_INIT_TXN | /* Initialize transactions */
DB_INIT_LOCK | /* Initialize locking. */
DB_THREAD; /* Use threads */

if (db->envp->open (db->envp, db->dirname, flags, 0) != 0) {
/* Cannot open environment */
goto err;
}
/*
* Configure db to perform deadlock detection internally, and to
* choose the transaction that has performed the least amount of
* writing to break the deadlock in the event that one is detected.
*/
db->envp->set_lk_detect(db->envp, DB_LOCK_MINWRITE);

flags = DB_CREATE | DB_THREAD;
/* Create and open db pointer */
if (db_create (&db->dbp, db->envp, 0) != 0) {
goto err;
}

if (db->dbp->open (db->dbp, NULL, db->filename, NULL, DB_BTREE, flags, 0) != 0) {
goto err;
}

db->initialized = TRUE;

return;
err:
if (db->dbp != NULL) {
db->dbp->close (db->dbp, 0);
}
if (db->envp != NULL) {
db->envp->close (db->envp, 0);
}
}

static gboolean
rspamd_bdb_insert (struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt)
{
struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend;
struct bdb_op *op;

if (!db->initialized) {
return FALSE;
}

op = g_slice_alloc (sizeof (struct bdb_op));
op->op = BDB_OP_INSERT;
op->elt = elt;
elt->flags |= KV_ELT_DIRTY;

g_queue_push_head (db->ops_queue, elt);
g_hash_table_insert (db->ops_hash, key, elt);

if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
return bdb_process_queue (db);
}

return TRUE;
}

static gboolean
rspamd_bdb_replace (struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt)
{
struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend;
struct bdb_op *op;

if (!db->initialized) {
return FALSE;
}

op = g_slice_alloc (sizeof (struct bdb_op));
op->op = BDB_OP_REPLACE;
op->elt = elt;
elt->flags |= KV_ELT_DIRTY;

g_queue_push_head (db->ops_queue, elt);
g_hash_table_insert (db->ops_hash, key, elt);

if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
return bdb_process_queue (db);
}

return TRUE;
}

static struct rspamd_kv_element*
rspamd_bdb_lookup (struct rspamd_kv_backend *backend, gpointer key)
{
struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend;
struct bdb_op *op;
DBC *cursorp;
DBT db_key, db_data;
struct rspamd_kv_element *elt = NULL;

if (!db->initialized) {
return NULL;
}
/* First search in ops queue */
if ((op = g_hash_table_lookup (db->ops_hash, key)) != NULL) {
if (op->op == BDB_OP_DELETE) {
/* To delete, so assume it as not found */
return NULL;
}
return op->elt;
}

/* Now try to search in bdb */
if (db->dbp->cursor (db->dbp, NULL, &cursorp, 0) != 0) {
return NULL;
}
memset (&db_key, 0, sizeof(DBT));
memset (&db_data, 0, sizeof(DBT));
db_key.size = strlen (key);
db_key.data = key;
db_data.flags = DB_DBT_MALLOC;

if (cursorp->get (cursorp, &db_key, &db_data, 0) == 0) {
elt = db_data.data;
}

cursorp->close (cursorp);
return elt;
}

static void
rspamd_bdb_delete (struct rspamd_kv_backend *backend, gpointer key)
{
struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend;
struct bdb_op *op;
struct rspamd_kv_element *elt;

if (!db->initialized) {
return;
}

if ((op = g_hash_table_lookup (db->ops_hash, key)) != NULL) {
return;
}

elt = rspamd_bdb_lookup (backend, key);
if (elt == NULL) {
return;
}
op = g_slice_alloc (sizeof (struct bdb_op));
op->op = BDB_OP_DELETE;
op->elt = elt;
elt->flags |= KV_ELT_DIRTY;

g_queue_push_head (db->ops_queue, elt);
g_hash_table_insert (db->ops_hash, key, elt);

if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
bdb_process_queue (db);
}

return;
}

static void
rspamd_bdb_destroy (struct rspamd_kv_backend *backend)
{
struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend;

if (db->initialized) {
if (db->dbp != NULL) {
db->dbp->close (db->dbp, 0);
}
if (db->envp != NULL) {
db->envp->close (db->envp, 0);
}
g_free (db->filename);
g_free (db->dirname);
g_queue_free (db->ops_queue);
g_hash_table_unref (db->ops_hash);
g_slice_free1 (sizeof (struct rspamd_bdb_backend), db);
}
}

/* Create new bdb backend */
struct rspamd_kv_backend *
rspamd_kv_bdb_new (const gchar *filename, guint sync_ops)
{
struct rspamd_bdb_backend *new;
struct stat st;
gchar *dirname;

if (filename == NULL) {
return NULL;
}
dirname = g_path_get_dirname (filename);
if (dirname == NULL || stat (dirname, &st) == -1 || !S_ISDIR (st.st_mode)) {
/* Inaccessible path */
if (dirname != NULL) {
g_free (dirname);
}
return NULL;
}

new = g_slice_alloc0 (sizeof (struct rspamd_bdb_backend));
new->dirname = dirname;
new->filename = g_strdup (filename);
new->sync_ops = sync_ops;
new->ops_queue = g_queue_new ();
new->ops_hash = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);

/* Init callbacks */
new->init_func = rspamd_bdb_init;
new->insert_func = rspamd_bdb_insert;
new->lookup_func = rspamd_bdb_lookup;
new->delete_func = rspamd_bdb_delete;
new->replace_func = rspamd_bdb_replace;
new->destroy_func = rspamd_bdb_destroy;

return (struct rspamd_kv_backend *)new;
}

+ 38
- 0
src/kvstorage_bdb.h View File

@@ -0,0 +1,38 @@
/* Copyright (c) 2010, Vsevolod Stakhov
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/


#ifndef KVSTORAGE_BDB_H_
#define KVSTORAGE_BDB_H_

#include "config.h"
#include "kvstorage.h"

#ifdef WITH_DB

/* Create new bdb backend */
struct rspamd_kv_backend* rspamd_kv_bdb_new (const gchar *filename, guint sync_ops);

#endif

#endif /* KVSTORAGE_BDB_H_ */

+ 32
- 0
src/kvstorage_config.c View File

@@ -24,6 +24,9 @@
#include "kvstorage_config.h"
#include "main.h"
#include "cfg_xml.h"
#ifdef WITH_DB
#include "kvstorage_bdb.h"
#endif

#define LRU_QUEUES 32

@@ -44,6 +47,8 @@ struct kvstorage_config_parser {
KVSTORAGE_STATE_CACHE_MAX_ELTS,
KVSTORAGE_STATE_CACHE_MAX_MEM,
KVSTORAGE_STATE_BACKEND_TYPE,
KVSTORAGE_STATE_BACKEND_FILENAME,
KVSTORAGE_STATE_BACKEND_SYNC_OPS,
KVSTORAGE_STATE_EXPIRE_TYPE,
KVSTORAGE_STATE_ERROR
} state;
@@ -90,6 +95,11 @@ kvstorage_init_callback (const gpointer key, const gpointer value, gpointer unus
case KVSTORAGE_TYPE_BACKEND_NULL:
backend = NULL;
break;
#ifdef WITH_DB
case KVSTORAGE_TYPE_BACKEND_BDB:
backend = rspamd_kv_bdb_new (kconf->backend.filename, kconf->backend.sync_ops);
break;
#endif
}

switch (kconf->expire.type) {
@@ -170,6 +180,14 @@ void kvstorage_xml_start_element (GMarkupParseContext *context,
kv_parser->state = KVSTORAGE_STATE_BACKEND_TYPE;
kv_parser->cur_elt = "type";
}
else if (g_ascii_strcasecmp (element_name, "filename") == 0) {
kv_parser->state = KVSTORAGE_STATE_BACKEND_FILENAME;
kv_parser->cur_elt = "filename";
}
else if (g_ascii_strcasecmp (element_name, "sync_ops") == 0) {
kv_parser->state = KVSTORAGE_STATE_BACKEND_SYNC_OPS;
kv_parser->cur_elt = "sync_ops";
}
else {
if (*error == NULL) {
*error = g_error_new (xml_error_quark (), XML_EXTRA_ELEMENT, "element %s is unexpected in backend definition",
@@ -238,6 +256,8 @@ void kvstorage_xml_end_element (GMarkupParseContext *context,
CHECK_TAG (KVSTORAGE_STATE_PARAM);
break;
case KVSTORAGE_STATE_BACKEND_TYPE:
case KVSTORAGE_STATE_BACKEND_FILENAME:
case KVSTORAGE_STATE_BACKEND_SYNC_OPS:
CHECK_TAG (KVSTORAGE_STATE_BACKEND);
break;
case KVSTORAGE_STATE_EXPIRE_TYPE:
@@ -342,6 +362,11 @@ void kvstorage_xml_text (GMarkupParseContext *context,
if (g_ascii_strncasecmp (text, "null", MIN (text_len, sizeof ("null") - 1)) == 0) {
kv_parser->current_storage->backend.type = KVSTORAGE_TYPE_BACKEND_NULL;
}
#ifdef WITH_DB
else if (g_ascii_strncasecmp (text, "bdb", MIN (text_len, sizeof ("bdb") - 1)) == 0) {
kv_parser->current_storage->backend.type = KVSTORAGE_TYPE_BACKEND_BDB;
}
#endif
else {
if (*error == NULL) {
*error = g_error_new (xml_error_quark (), XML_EXTRA_ELEMENT, "invalid backend type: %*s", (int)text_len, text);
@@ -349,6 +374,13 @@ void kvstorage_xml_text (GMarkupParseContext *context,
kv_parser->state = KVSTORAGE_STATE_ERROR;
}
break;
case KVSTORAGE_STATE_BACKEND_FILENAME:
kv_parser->current_storage->backend.filename = g_malloc (text_len + 1);
rspamd_strlcpy (kv_parser->current_storage->backend.filename, text, text_len + 1);
break;
case KVSTORAGE_STATE_BACKEND_SYNC_OPS:
kv_parser->current_storage->backend.sync_ops = parse_limit (text, text_len);
break;
case KVSTORAGE_STATE_EXPIRE_TYPE:
if (g_ascii_strncasecmp (text, "lru", MIN (text_len, sizeof ("lru") - 1)) == 0) {
kv_parser->current_storage->expire.type = KVSTORAGE_TYPE_EXPIRE_LRU;

+ 6
- 1
src/kvstorage_config.h View File

@@ -36,7 +36,10 @@ enum kvstorage_cache_type {

/* Type of kvstorage backend */
enum kvstorage_backend_type {
KVSTORAGE_TYPE_BACKEND_NULL = 0
KVSTORAGE_TYPE_BACKEND_NULL = 0,
#ifdef WITH_DB
KVSTORAGE_TYPE_BACKEND_BDB
#endif
};

/* Type of kvstorage expire */
@@ -54,6 +57,8 @@ struct kvstorage_cache_config {
/* Backend config */
struct kvstorage_backend_config {
enum kvstorage_backend_type type;
gchar *filename;
guint sync_ops;
};



Loading…
Cancel
Save