From f678526b2089826fba0299c88b9258bc48f748c4 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Wed, 2 Nov 2011 17:49:09 +0300 Subject: [PATCH] * Implement sqlite3 backend for kvstorage. --- CMakeLists.txt | 17 +- config.h.in | 2 + lib/CMakeLists.txt | 3 + src/kvstorage.c | 20 +- src/kvstorage.h | 3 +- src/kvstorage_bdb.c | 47 +---- src/kvstorage_config.c | 14 ++ src/kvstorage_config.h | 6 +- src/kvstorage_sqlite.c | 429 +++++++++++++++++++++++++++++++++++++++++ src/kvstorage_sqlite.h | 38 ++++ 10 files changed, 533 insertions(+), 46 deletions(-) create mode 100644 src/kvstorage_sqlite.c create mode 100644 src/kvstorage_sqlite.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 1f28c2849..ed0cc130c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -317,6 +317,18 @@ IF(PCRE_LIBRARY_DIRS) LINK_DIRECTORIES("${PCRE_LIBRARY_DIRS}") ENDIF(PCRE_LIBRARY_DIRS) +# Find optional sqlite3 support +pkg_check_modules(SQLITE sqlite3) +IF(SQLITE_FOUND) + SET(WITH_SQLITE 1) +ENDIF(SQLITE_FOUND) +IF(SQLITE_INCLUDE_DIRS) + INCLUDE_DIRECTORIES("${SQLITE_INCLUDE_DIRS}") +ENDIF(SQLITE_INCLUDE_DIRS) +IF(SQLITE_LIBRARY_DIRS) + LINK_DIRECTORIES("${SQLITE_LIBRARY_DIRS}") +ENDIF(SQLITE_LIBRARY_DIRS) + IF(ENABLE_STATIC MATCHES "ON") pkg_check_modules(GLIB2 REQUIRED glib-2.0>=2.12) ELSE(ENABLE_STATIC MATCHES "ON") @@ -816,11 +828,14 @@ ENDIF(LIBJUDY_LIBRARY) IF(LIBDB_LIBRARY) TARGET_LINK_LIBRARIES(rspamd db) ENDIF(LIBDB_LIBRARY) +IF(SQLITE_LIBRARIES) + TARGET_LINK_LIBRARIES(rspamd ${SQLITE_LIBRARIES}) +ENDIF(SQLITE_LIBRARIES) TARGET_LINK_LIBRARIES(rspamd event) IF(HAVE_LIBEVENT2) TARGET_LINK_LIBRARIES(rspamd event_pthreads) ENDIF(HAVE_LIBEVENT2) -TARGET_LINK_LIBRARIES(rspamd pcre) +TARGET_LINK_LIBRARIES(rspamd ${PCRE_LIBRARIES}) TARGET_LINK_LIBRARIES(rspamd ${CMAKE_REQUIRED_LIBRARIES}) TARGET_LINK_LIBRARIES(rspamd ${GLIB2_LIBRARIES}) diff --git a/config.h.in b/config.h.in index baa4beea8..433c6789b 100644 --- a/config.h.in +++ b/config.h.in @@ -155,6 +155,8 @@ #cmakedefine WITH_DB 1 +#cmakedefine WITH_SQLITE 1 + #cmakedefine WITH_GPERF_TOOLS 1 #cmakedefine HAVE_ASM_PAUSE 1 diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 9b211110c..2b762b6c0 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -74,6 +74,9 @@ SET(RSPAMDLIBSRC ../src/binlog.c IF(WITH_DB) LIST(APPEND RSPAMDLIBSRC ../src/kvstorage_bdb.c) ENDIF(WITH_DB) +IF(WITH_SQLITE) + LIST(APPEND RSPAMDLIBSRC ../src/kvstorage_sqlite.c) +ENDIF(WITH_SQLITE) ADD_LIBRARY(rspamdserver STATIC ${RSPAMDLIBSRC}) SET_TARGET_PROPERTIES(rspamdserver PROPERTIES LINKER_LANGUAGE C) diff --git a/src/kvstorage.c b/src/kvstorage.c index f1862f275..41b787dc5 100644 --- a/src/kvstorage.c +++ b/src/kvstorage.c @@ -157,6 +157,18 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key, } } + /* First try to search it in cache */ + elt = storage->cache->lookup_func (storage->cache, key); + if (elt) { + if (elt->flags & KV_ELT_DIRTY) { + /* Element is in backend storage queue */ + elt->flags |= KV_ELT_NEED_FREE; + } + else { + g_slice_free1 (ELT_SIZE (elt), elt); + } + } + /* Insert elt to the cache */ elt = storage->cache->insert_func (storage->cache, key, data, len); if (elt == NULL) { @@ -346,12 +358,12 @@ rspamd_kv_storage_set_array (struct rspamd_kv_storage *storage, gpointer key, return FALSE; } /* Get element size */ - es = (guint *)elt->data; + es = (guint *)ELT_DATA (elt); if (elt_num > (elt->size - sizeof (guint)) / (*es)) { /* Invalid index */ return FALSE; } - target = (gchar *)elt->data + sizeof (guint) + (*es) * elt_num; + target = (gchar *)ELT_DATA (elt) + sizeof (guint) + (*es) * elt_num; if (len != *es) { /* Invalid size */ return FALSE; @@ -383,12 +395,12 @@ rspamd_kv_storage_get_array (struct rspamd_kv_storage *storage, gpointer key, return FALSE; } /* Get element size */ - es = (guint *)elt->data; + es = (guint *)ELT_DATA (elt); if (elt_num > (elt->size - sizeof (guint)) / (*es)) { /* Invalid index */ return FALSE; } - target = elt->data + sizeof (guint) + (*es) * elt_num; + target = ELT_DATA (elt) + sizeof (guint) + (*es) * elt_num; *len = *es; *data = target; diff --git a/src/kvstorage.h b/src/kvstorage.h index 5a5f8f8ff..ffff000db 100644 --- a/src/kvstorage.h +++ b/src/kvstorage.h @@ -61,7 +61,8 @@ enum rspamd_kv_flags { KV_ELT_ARRAY = 1 << 0, KV_ELT_PERSISTENT = 1 << 1, KV_ELT_DIRTY = 1 << 2, - KV_ELT_OUSTED = 1 << 3 + KV_ELT_OUSTED = 1 << 3, + KV_ELT_NEED_FREE = 1 << 4 }; #define ELT_DATA(elt) (gchar *)(elt)->data + (elt)->keylen + 1 diff --git a/src/kvstorage_bdb.c b/src/kvstorage_bdb.c index 6e82c502d..87c343b22 100644 --- a/src/kvstorage_bdb.c +++ b/src/kvstorage_bdb.c @@ -58,7 +58,7 @@ struct rspamd_bdb_backend { /* Process single bdb operation */ static gboolean -bdb_process_single_op (struct rspamd_bdb_backend *db, DB_TXN *txn, DBC *cursorp, struct bdb_op *op) +bdb_process_single_op (struct rspamd_bdb_backend *db, DB_TXN *txn, struct bdb_op *op) { DBT db_key, db_data; @@ -74,18 +74,14 @@ bdb_process_single_op (struct rspamd_bdb_backend *db, DB_TXN *txn, DBC *cursorp, case BDB_OP_INSERT: case BDB_OP_REPLACE: db_data.flags = DB_DBT_USERMEM; - if (cursorp->put (cursorp, &db_key, &db_data, DB_KEYFIRST) != 0) { + if (db->dbp->put (db->dbp, NULL, &db_key, &db_data, 0) != 0) { return FALSE; } break; case BDB_OP_DELETE: db_data.flags = DB_DBT_USERMEM; /* Set cursor */ - if (cursorp->get (cursorp, &db_key, &db_data, DB_SET) != 0) { - return FALSE; - } - /* Del record */ - if (cursorp->del (cursorp, 0) != 0) { + if (db->dbp->del (db->dbp, NULL, &db_key, 0) != 0) { return FALSE; } break; @@ -100,40 +96,22 @@ static gboolean bdb_process_queue (struct rspamd_bdb_backend *db) { struct bdb_op *op; - DBC *cursorp; - DB_TXN *txn = NULL; GList *cur; - /* Start transaction */ - if (db->envp->txn_begin (db->envp, NULL, &txn, 0) != 0) { - return FALSE; - } - if (db->dbp->cursor (db->dbp, txn, &cursorp, 0) != 0) { - txn->abort (txn); - return FALSE; - } - cur = db->ops_queue->head; while (cur) { op = cur->data; - if (! bdb_process_single_op (db, txn, cursorp, op)) { - txn->abort (txn); + if (! bdb_process_single_op (db, NULL, op)) { return FALSE; } cur = g_list_next (cur); } - /* Commit transaction */ - cursorp->close (cursorp); - if (txn->commit (txn, 0) != 0) { - return FALSE; - } - /* Clean the queue */ cur = db->ops_queue->head; while (cur) { op = cur->data; - if (op->op == BDB_OP_DELETE) { + if (op->op == BDB_OP_DELETE || (op->elt->flags & KV_ELT_NEED_FREE) != 0) { /* Also clean memory */ g_slice_free1 (ELT_SIZE (op->elt), op->elt); } @@ -162,11 +140,8 @@ rspamd_bdb_init (struct rspamd_kv_backend *backend) } flags = DB_INIT_MPOOL | - DB_RECOVER | DB_CREATE | /* Create the environment if it does not already exist. */ - DB_INIT_TXN | /* Initialize transactions */ DB_INIT_LOCK | /* Initialize locking. */ - DB_INIT_LOG | /* Initialize logging */ DB_THREAD; /* Use threads */ if ((ret = db->envp->open (db->envp, db->dirname, flags, 0)) != 0) { @@ -178,9 +153,9 @@ rspamd_bdb_init (struct rspamd_kv_backend *backend) * choose the transaction that has performed the least amount of * writing to break the deadlock in the event that one is detected. */ - db->envp->set_lk_detect(db->envp, DB_LOCK_MINWRITE); + db->envp->set_lk_detect (db->envp, DB_LOCK_DEFAULT); - flags = DB_CREATE | DB_THREAD | DB_AUTO_COMMIT; + flags = DB_CREATE | DB_THREAD; /* Create and open db pointer */ if ((ret = db_create (&db->dbp, db->envp, 0)) != 0) { goto err; @@ -259,7 +234,6 @@ rspamd_bdb_lookup (struct rspamd_kv_backend *backend, gpointer key) { struct rspamd_bdb_backend *db = (struct rspamd_bdb_backend *)backend; struct bdb_op *op; - DBC *cursorp; DBT db_key, db_data; struct rspamd_kv_element *elt = NULL; @@ -275,22 +249,17 @@ rspamd_bdb_lookup (struct rspamd_kv_backend *backend, gpointer key) return op->elt; } - /* Now try to search in bdb */ - if (db->dbp->cursor (db->dbp, NULL, &cursorp, 0) != 0) { - return NULL; - } memset (&db_key, 0, sizeof(DBT)); memset (&db_data, 0, sizeof(DBT)); db_key.size = strlen (key); db_key.data = key; db_data.flags = DB_DBT_MALLOC; - if (cursorp->get (cursorp, &db_key, &db_data, DB_SET) == 0) { + if (db->dbp->get (db->dbp, NULL, &db_key, &db_data, 0) == 0) { elt = db_data.data; elt->flags &= ~KV_ELT_DIRTY; } - cursorp->close (cursorp); return elt; } diff --git a/src/kvstorage_config.c b/src/kvstorage_config.c index 0dbad5380..57a23d6ab 100644 --- a/src/kvstorage_config.c +++ b/src/kvstorage_config.c @@ -27,6 +27,9 @@ #ifdef WITH_DB #include "kvstorage_bdb.h" #endif +#ifdef WITH_SQLITE +#include "kvstorage_sqlite.h" +#endif #define LRU_QUEUES 32 @@ -93,12 +96,18 @@ kvstorage_init_callback (const gpointer key, const gpointer value, gpointer unus switch (kconf->backend.type) { case KVSTORAGE_TYPE_BACKEND_NULL: + case KVSTORAGE_TYPE_BACKEND_MAX: backend = NULL; break; #ifdef WITH_DB case KVSTORAGE_TYPE_BACKEND_BDB: backend = rspamd_kv_bdb_new (kconf->backend.filename, kconf->backend.sync_ops); break; +#endif +#ifdef WITH_SQLITE + case KVSTORAGE_TYPE_BACKEND_SQLITE: + backend = rspamd_kv_sqlite_new (kconf->backend.filename, kconf->backend.sync_ops); + break; #endif } @@ -366,6 +375,11 @@ void kvstorage_xml_text (GMarkupParseContext *context, else if (g_ascii_strncasecmp (text, "bdb", MIN (text_len, sizeof ("bdb") - 1)) == 0) { kv_parser->current_storage->backend.type = KVSTORAGE_TYPE_BACKEND_BDB; } +#endif +#ifdef WITH_SQLITE + else if (g_ascii_strncasecmp (text, "sqlite", MIN (text_len, sizeof ("sqlite") - 1)) == 0) { + kv_parser->current_storage->backend.type = KVSTORAGE_TYPE_BACKEND_SQLITE; + } #endif else { if (*error == NULL) { diff --git a/src/kvstorage_config.h b/src/kvstorage_config.h index b6f980b94..7a3553f00 100644 --- a/src/kvstorage_config.h +++ b/src/kvstorage_config.h @@ -38,8 +38,12 @@ enum kvstorage_cache_type { enum kvstorage_backend_type { KVSTORAGE_TYPE_BACKEND_NULL = 0, #ifdef WITH_DB - KVSTORAGE_TYPE_BACKEND_BDB + KVSTORAGE_TYPE_BACKEND_BDB, #endif +#ifdef WITH_SQLITE + KVSTORAGE_TYPE_BACKEND_SQLITE, +#endif + KVSTORAGE_TYPE_BACKEND_MAX = 255 }; /* Type of kvstorage expire */ diff --git a/src/kvstorage_sqlite.c b/src/kvstorage_sqlite.c new file mode 100644 index 000000000..bde3ea1c6 --- /dev/null +++ b/src/kvstorage_sqlite.c @@ -0,0 +1,429 @@ +/* Copyright (c) 2010, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#include "config.h" +#include "kvstorage.h" +#include "kvstorage_sqlite.h" +#include "util.h" +#include "main.h" +#include + +#define TABLE_NAME "kvstorage" +#define CREATE_TABLE_SQL "CREATE TABLE " TABLE_NAME " (key TEXT CONSTRAINT _key PRIMARY KEY, data BLOB)" +#define SET_SQL "INSERT OR REPLACE INTO " TABLE_NAME " (key, data) VALUES (?1, ?2)" +#define GET_SQL "SELECT data FROM " TABLE_NAME " WHERE key = ?1" +#define DELETE_SQL "DELETE FROM " TABLE_NAME " WHERE key = ?1" + +struct sqlite_op { + struct rspamd_kv_element *elt; + enum { + SQLITE_OP_INSERT, + SQLITE_OP_DELETE, + SQLITE_OP_REPLACE + } op; +}; + +/* Main sqlite structure */ +struct rspamd_sqlite_backend { + backend_init init_func; /*< this callback is called on kv storage initialization */ + backend_insert insert_func; /*< this callback is called when element is inserted */ + backend_replace replace_func; /*< this callback is called when element is replaced */ + backend_lookup lookup_func; /*< this callback is used for lookup of element */ + backend_delete delete_func; /*< this callback is called when an element is deleted */ + backend_destroy destroy_func; /*< this callback is used for destroying all elements inside backend */ + sqlite3 *dbp; + gchar *filename; + gchar *dirname; + guint sync_ops; + GQueue *ops_queue; + GHashTable *ops_hash; + gboolean initialized; + sqlite3_stmt *get_stmt; + sqlite3_stmt *set_stmt; + sqlite3_stmt *delete_stmt; +}; + +/* Process single sqlite operation */ +static gboolean +sqlite_process_single_op (struct rspamd_sqlite_backend *db, struct sqlite_op *op) +{ + gboolean res = FALSE; + + op->elt->flags &= ~KV_ELT_DIRTY; + switch (op->op) { + case SQLITE_OP_INSERT: + case SQLITE_OP_REPLACE: + if (sqlite3_bind_text (db->set_stmt, 1, ELT_KEY (op->elt), op->elt->keylen, SQLITE_STATIC) == SQLITE_OK && + sqlite3_bind_blob (db->set_stmt, 2, op->elt, ELT_SIZE (op->elt), SQLITE_STATIC) == SQLITE_OK) { + if (sqlite3_step (db->set_stmt) == SQLITE_DONE) { + res = TRUE; + } + } + sqlite3_reset (db->set_stmt); + break; + case SQLITE_OP_DELETE: + if (sqlite3_bind_text (db->delete_stmt, 1, ELT_KEY (op->elt), op->elt->keylen, SQLITE_STATIC) == SQLITE_OK) { + if (sqlite3_step (db->delete_stmt) == SQLITE_DONE) { + res = TRUE; + } + } + sqlite3_reset (db->delete_stmt); + break; + } + + if (!res) { + op->elt->flags |= KV_ELT_DIRTY; + } + return res; +} + +/* Process operations queue */ +static gboolean +sqlite_process_queue (struct rspamd_sqlite_backend *db) +{ + struct sqlite_op *op; + GList *cur; + + cur = db->ops_queue->head; + while (cur) { + op = cur->data; + if (! sqlite_process_single_op (db, op)) { + return FALSE; + } + cur = g_list_next (cur); + } + + /* Clean the queue */ + cur = db->ops_queue->head; + while (cur) { + op = cur->data; + if (op->op == SQLITE_OP_DELETE || (op->elt->flags & KV_ELT_NEED_FREE) != 0) { + /* Also clean memory */ + g_slice_free1 (ELT_SIZE (op->elt), op->elt); + } + g_slice_free1 (sizeof (struct sqlite_op), op); + cur = g_list_next (cur); + } + + g_hash_table_remove_all (db->ops_hash); + g_queue_clear (db->ops_queue); + + return TRUE; + +} + +/* Create table for kvstorage */ +static gboolean +rspamd_sqlite_create_table (struct rspamd_sqlite_backend *db) +{ + gint ret; + sqlite3_stmt *stmt = NULL; + + ret = sqlite3_prepare_v2 (db->dbp, CREATE_TABLE_SQL, sizeof (CREATE_TABLE_SQL) - 1, &stmt, NULL); + if (ret != SQLITE_OK) { + if (stmt != NULL) { + sqlite3_finalize (stmt); + } + return FALSE; + } + + ret = sqlite3_step (stmt); + if (ret != SQLITE_DONE) { + sqlite3_finalize (stmt); + return FALSE; + } + + sqlite3_finalize (stmt); + return TRUE; +} + +/* Backend callbacks */ +static void +rspamd_sqlite_init (struct rspamd_kv_backend *backend) +{ + struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend; + guint32 flags; + gint ret, r; + gchar sqlbuf[BUFSIZ]; + sqlite3_stmt *stmt = NULL; + + /* Set multi-threaded mode */ + if (sqlite3_config (SQLITE_CONFIG_MULTITHREAD) != SQLITE_OK) { + goto err; + } + + flags = SQLITE_OPEN_READWRITE | + SQLITE_OPEN_CREATE | + SQLITE_OPEN_NOMUTEX; + + ret = sqlite3_open_v2 (db->filename, &db->dbp, flags, NULL); + + if (ret != 0) { + goto err; + } + /* Now check if we have table */ + r = rspamd_snprintf (sqlbuf, sizeof (sqlbuf), "SELECT * FROM " TABLE_NAME " LIMIT 1"); + ret = sqlite3_prepare_v2 (db->dbp, sqlbuf, r, &stmt, NULL); + + if (ret == SQLITE_ERROR) { + /* Try to create table */ + if (!rspamd_sqlite_create_table (db)) { + goto err; + } + } + else if (ret != SQLITE_OK) { + goto err; + } + /* We have table here, perform vacuum */ + sqlite3_finalize (stmt); + r = rspamd_snprintf (sqlbuf, sizeof (sqlbuf), "VACUUM"); + ret = sqlite3_prepare_v2 (db->dbp, sqlbuf, r, &stmt, NULL); + if (ret != SQLITE_OK) { + goto err; + } + /* Perform VACUUM */ + sqlite3_step (stmt); + sqlite3_finalize (stmt); + + /* Prepare required statements */ + ret = sqlite3_prepare_v2 (db->dbp, GET_SQL, sizeof (GET_SQL) - 1, &db->get_stmt, NULL); + if (ret != SQLITE_OK) { + goto err; + } + ret = sqlite3_prepare_v2 (db->dbp, SET_SQL, sizeof (SET_SQL) - 1, &db->set_stmt, NULL); + if (ret != SQLITE_OK) { + goto err; + } + ret = sqlite3_prepare_v2 (db->dbp, DELETE_SQL, sizeof (DELETE_SQL) - 1, &db->delete_stmt, NULL); + if (ret != SQLITE_OK) { + goto err; + } + + db->initialized = TRUE; + + return; +err: + if (db->dbp != NULL) { + msg_err ("error opening sqlite database: %d", ret); + } + if (stmt != NULL) { + msg_err ("error executing statement: %d", ret); + sqlite3_finalize (stmt); + } + if (db->get_stmt != NULL) { + sqlite3_finalize (db->get_stmt); + } + if (db->set_stmt != NULL) { + sqlite3_finalize (db->set_stmt); + } + if (db->delete_stmt != NULL) { + sqlite3_finalize (db->delete_stmt); + } +} + +static gboolean +rspamd_sqlite_insert (struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt) +{ + struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend; + struct sqlite_op *op; + + if (!db->initialized) { + return FALSE; + } + + op = g_slice_alloc (sizeof (struct sqlite_op)); + op->op = SQLITE_OP_INSERT; + op->elt = elt; + elt->flags |= KV_ELT_DIRTY; + + g_queue_push_head (db->ops_queue, op); + g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); + + if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { + return sqlite_process_queue (db); + } + + return TRUE; +} + +static gboolean +rspamd_sqlite_replace (struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt) +{ + struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend; + struct sqlite_op *op; + + if (!db->initialized) { + return FALSE; + } + + op = g_slice_alloc (sizeof (struct sqlite_op)); + op->op = SQLITE_OP_REPLACE; + op->elt = elt; + elt->flags |= KV_ELT_DIRTY; + + g_queue_push_head (db->ops_queue, op); + g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); + + if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { + return sqlite_process_queue (db); + } + + return TRUE; +} + +static struct rspamd_kv_element* +rspamd_sqlite_lookup (struct rspamd_kv_backend *backend, gpointer key) +{ + struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend; + struct sqlite_op *op; + struct rspamd_kv_element *elt = NULL; + gint l; + gconstpointer d; + + if (!db->initialized) { + return NULL; + } + /* First search in ops queue */ + if ((op = g_hash_table_lookup (db->ops_hash, key)) != NULL) { + if (op->op == SQLITE_OP_DELETE) { + /* To delete, so assume it as not found */ + return NULL; + } + return op->elt; + } + + if (sqlite3_bind_text (db->get_stmt, 1, key, strlen (key), SQLITE_STATIC) == SQLITE_OK) { + if (sqlite3_step (db->get_stmt) == SQLITE_ROW) { + l = sqlite3_column_bytes (db->get_stmt, 0); + elt = g_malloc (l); + d = sqlite3_column_blob (db->get_stmt, 0); + /* Make temporary copy */ + memcpy (elt, d, l); + } + } + + sqlite3_reset (db->get_stmt); + return elt; +} + +static void +rspamd_sqlite_delete (struct rspamd_kv_backend *backend, gpointer key) +{ + struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend; + struct sqlite_op *op; + struct rspamd_kv_element *elt; + + if (!db->initialized) { + return; + } + + if ((op = g_hash_table_lookup (db->ops_hash, key)) != NULL) { + op->op = SQLITE_OP_DELETE; + return; + } + + elt = rspamd_sqlite_lookup (backend, key); + if (elt == NULL) { + return; + } + op = g_slice_alloc (sizeof (struct sqlite_op)); + op->op = SQLITE_OP_DELETE; + op->elt = elt; + elt->flags |= KV_ELT_DIRTY; + + g_queue_push_head (db->ops_queue, op); + g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op); + + if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { + sqlite_process_queue (db); + } + + return; +} + +static void +rspamd_sqlite_destroy (struct rspamd_kv_backend *backend) +{ + struct rspamd_sqlite_backend *db = (struct rspamd_sqlite_backend *)backend; + + if (db->initialized) { + sqlite_process_queue (db); + if (db->get_stmt != NULL) { + sqlite3_finalize (db->get_stmt); + } + if (db->set_stmt != NULL) { + sqlite3_finalize (db->set_stmt); + } + if (db->delete_stmt != NULL) { + sqlite3_finalize (db->delete_stmt); + } + sqlite3_close (db->dbp); + g_free (db->filename); + g_free (db->dirname); + g_queue_free (db->ops_queue); + g_hash_table_unref (db->ops_hash); + g_slice_free1 (sizeof (struct rspamd_sqlite_backend), db); + } +} + +/* Create new sqlite backend */ +struct rspamd_kv_backend * +rspamd_kv_sqlite_new (const gchar *filename, guint sync_ops) +{ + struct rspamd_sqlite_backend *new; + struct stat st; + gchar *dirname; + + if (filename == NULL) { + return NULL; + } + + dirname = g_path_get_dirname (filename); + if (dirname == NULL || stat (dirname, &st) == -1 || !S_ISDIR (st.st_mode)) { + /* Inaccessible path */ + if (dirname != NULL) { + g_free (dirname); + } + msg_err ("invalid file: %s", filename); + return NULL; + } + + new = g_slice_alloc0 (sizeof (struct rspamd_sqlite_backend)); + new->dirname = dirname; + new->filename = g_strdup (filename); + new->sync_ops = sync_ops; + new->ops_queue = g_queue_new (); + new->ops_hash = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal); + + /* Init callbacks */ + new->init_func = rspamd_sqlite_init; + new->insert_func = rspamd_sqlite_insert; + new->lookup_func = rspamd_sqlite_lookup; + new->delete_func = rspamd_sqlite_delete; + new->replace_func = rspamd_sqlite_replace; + new->destroy_func = rspamd_sqlite_destroy; + + return (struct rspamd_kv_backend *)new; +} + diff --git a/src/kvstorage_sqlite.h b/src/kvstorage_sqlite.h new file mode 100644 index 000000000..fda990257 --- /dev/null +++ b/src/kvstorage_sqlite.h @@ -0,0 +1,38 @@ +/* Copyright (c) 2010, Vsevolod Stakhov + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +#ifndef KVSTORAGE_SQLITE_H_ +#define KVSTORAGE_SQLITE_H_ + +#include "config.h" +#include "kvstorage.h" + +#ifdef WITH_DB + +/* Create new bdb backend */ +struct rspamd_kv_backend* rspamd_kv_sqlite_new (const gchar *filename, guint sync_ops); + +#endif + +#endif /* KVSTORAGE_SQLITE_H_ */ -- 2.39.5