]> source.dussan.org Git - rspamd.git/commitdiff
* Implement sqlite3 backend for kvstorage.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 2 Nov 2011 14:49:09 +0000 (17:49 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 2 Nov 2011 14:49:09 +0000 (17:49 +0300)
CMakeLists.txt
config.h.in
lib/CMakeLists.txt
src/kvstorage.c
src/kvstorage.h
src/kvstorage_bdb.c
src/kvstorage_config.c
src/kvstorage_config.h
src/kvstorage_sqlite.c [new file with mode: 0644]
src/kvstorage_sqlite.h [new file with mode: 0644]

index 1f28c28495d491c749d96cfb7ff68f1dcc39d227..ed0cc130cbaf20e658d3af16f1a6a232ca44fd89 100644 (file)
@@ -317,6 +317,18 @@ IF(PCRE_LIBRARY_DIRS)
        LINK_DIRECTORIES("${PCRE_LIBRARY_DIRS}")
 ENDIF(PCRE_LIBRARY_DIRS)
 
+# Find optional sqlite3 support
+pkg_check_modules(SQLITE sqlite3)
+IF(SQLITE_FOUND)
+       SET(WITH_SQLITE 1)
+ENDIF(SQLITE_FOUND)
+IF(SQLITE_INCLUDE_DIRS)
+       INCLUDE_DIRECTORIES("${SQLITE_INCLUDE_DIRS}")
+ENDIF(SQLITE_INCLUDE_DIRS)
+IF(SQLITE_LIBRARY_DIRS)
+       LINK_DIRECTORIES("${SQLITE_LIBRARY_DIRS}")
+ENDIF(SQLITE_LIBRARY_DIRS)
+
 IF(ENABLE_STATIC MATCHES "ON")
        pkg_check_modules(GLIB2 REQUIRED glib-2.0>=2.12)
 ELSE(ENABLE_STATIC MATCHES "ON")
@@ -816,11 +828,14 @@ ENDIF(LIBJUDY_LIBRARY)
 IF(LIBDB_LIBRARY)
        TARGET_LINK_LIBRARIES(rspamd db)
 ENDIF(LIBDB_LIBRARY)
+IF(SQLITE_LIBRARIES)
+       TARGET_LINK_LIBRARIES(rspamd ${SQLITE_LIBRARIES})
+ENDIF(SQLITE_LIBRARIES)
 TARGET_LINK_LIBRARIES(rspamd event)
 IF(HAVE_LIBEVENT2)
        TARGET_LINK_LIBRARIES(rspamd event_pthreads)
 ENDIF(HAVE_LIBEVENT2)
-TARGET_LINK_LIBRARIES(rspamd pcre)
+TARGET_LINK_LIBRARIES(rspamd ${PCRE_LIBRARIES})
 
 TARGET_LINK_LIBRARIES(rspamd ${CMAKE_REQUIRED_LIBRARIES})
 TARGET_LINK_LIBRARIES(rspamd ${GLIB2_LIBRARIES})
index baa4beea8fbefbe3757ca2096d07c1a4dacc1214..433c6789b907bb5a1d468357d14ce633838ee5d2 100644 (file)
 
 #cmakedefine WITH_DB             1
 
+#cmakedefine WITH_SQLITE         1
+
 #cmakedefine WITH_GPERF_TOOLS    1
 
 #cmakedefine HAVE_ASM_PAUSE      1
index 9b211110c37df6fcfeaf4396a9d488bece98ea91..2b762b6c076145759bb3d7028a664c24fb42b389 100644 (file)
@@ -74,6 +74,9 @@ SET(RSPAMDLIBSRC ../src/binlog.c
 IF(WITH_DB)
        LIST(APPEND RSPAMDLIBSRC ../src/kvstorage_bdb.c)
 ENDIF(WITH_DB)
+IF(WITH_SQLITE)
+       LIST(APPEND RSPAMDLIBSRC ../src/kvstorage_sqlite.c)
+ENDIF(WITH_SQLITE)
                                
 ADD_LIBRARY(rspamdserver STATIC ${RSPAMDLIBSRC})
 SET_TARGET_PROPERTIES(rspamdserver PROPERTIES LINKER_LANGUAGE C)
index f1862f27500b5160ccd86a26752b465b264305a8..41b787dc5c3725f2a1e794ee67314a50aed21b7d 100644 (file)
@@ -157,6 +157,18 @@ rspamd_kv_storage_insert (struct rspamd_kv_storage *storage, gpointer key,
                }
        }
 
+       /* First try to search it in cache */
+       elt = storage->cache->lookup_func (storage->cache, key);
+       if (elt) {
+               if (elt->flags & KV_ELT_DIRTY) {
+                       /* Element is in backend storage queue */
+                       elt->flags |= KV_ELT_NEED_FREE;
+               }
+               else {
+                       g_slice_free1 (ELT_SIZE (elt), elt);
+               }
+       }
+
        /* Insert elt to the cache */
        elt = storage->cache->insert_func (storage->cache, key, data, len);
        if (elt == NULL) {
@@ -346,12 +358,12 @@ rspamd_kv_storage_set_array (struct rspamd_kv_storage *storage, gpointer key,
                return FALSE;
        }
        /* Get element size */
-       es = (guint *)elt->data;
+       es = (guint *)ELT_DATA (elt);
        if (elt_num > (elt->size - sizeof (guint)) / (*es)) {
                /* Invalid index */
                return FALSE;
        }
-       target = (gchar *)elt->data + sizeof (guint) + (*es) * elt_num;
+       target = (gchar *)ELT_DATA (elt) + sizeof (guint) + (*es) * elt_num;
        if (len != *es) {
                /* Invalid size */
                return FALSE;
@@ -383,12 +395,12 @@ rspamd_kv_storage_get_array (struct rspamd_kv_storage *storage, gpointer key,
                return FALSE;
        }
        /* Get element size */
-       es = (guint *)elt->data;
+       es = (guint *)ELT_DATA (elt);
        if (elt_num > (elt->size - sizeof (guint)) / (*es)) {
                /* Invalid index */
                return FALSE;
        }
-       target = elt->data + sizeof (guint) + (*es) * elt_num;
+       target = ELT_DATA (elt) + sizeof (guint) + (*es) * elt_num;
 
        *len = *es;
        *data = target;
index 5a5f8f8ffd799f73b18d7ce4a91500cf955e9432..ffff000db0831ac13163ff38e3920650767e5d3f 100644 (file)
@@ -61,7 +61,8 @@ enum rspamd_kv_flags {
        KV_ELT_ARRAY = 1 << 0,
        KV_ELT_PERSISTENT = 1 << 1,
        KV_ELT_DIRTY = 1 << 2,
-       KV_ELT_OUSTED = 1 << 3
+       KV_ELT_OUSTED = 1 << 3,
+       KV_ELT_NEED_FREE = 1 << 4
 };
 
 #define ELT_DATA(elt) (gchar *)(elt)->data + (elt)->keylen + 1
index 6e82c502db8e9f91cac2a6b303ea3d3cb9c0362f..87c343b223a03b9884f116e4a565db8cce4d2eb1 100644 (file)
@@ -58,7 +58,7 @@ struct rspamd_bdb_backend {
 
 /* Process single bdb operation */
 static gboolean
-bdb_process_single_op (struct rspamd_bdb_backend *db, DB_TXN *txn, DBC *cursorp, struct bdb_op *op)
+bdb_process_single_op (struct rspamd_bdb_backend *db, DB_TXN *txn, struct bdb_op *op)
 {
        DBT                                                                              db_key, db_data;
 
@@ -74,18 +74,14 @@ bdb_process_single_op (struct rspamd_bdb_backend *db, DB_TXN *txn, DBC *cursorp,
        case BDB_OP_INSERT:
        case BDB_OP_REPLACE:
                db_data.flags = DB_DBT_USERMEM;
-               if (cursorp->put (cursorp, &db_key, &db_data, DB_KEYFIRST) != 0) {
+               if (db->dbp->put (db->dbp, NULL, &db_key, &db_data, 0) != 0) {
                        return FALSE;
                }
                break;
        case BDB_OP_DELETE:
                db_data.flags = DB_DBT_USERMEM;
                /* Set cursor */
-               if (cursorp->get (cursorp, &db_key, &db_data, DB_SET) != 0) {
-                       return FALSE;
-               }
-               /* Del record */
-               if (cursorp->del (cursorp, 0) != 0) {
+               if (db->dbp->del (db->dbp, NULL, &db_key, 0) != 0) {
                        return FALSE;
                }
                break;
@@ -100,40 +96,22 @@ static gboolean
 bdb_process_queue (struct rspamd_bdb_backend *db)
 {
        struct bdb_op                                                           *op;
-       DBC                                                                             *cursorp;
-       DB_TXN                                                                          *txn = NULL;
        GList                                                                           *cur;
 
-       /* Start transaction */
-       if (db->envp->txn_begin (db->envp, NULL, &txn, 0) != 0) {
-               return FALSE;
-       }
-       if (db->dbp->cursor (db->dbp, txn, &cursorp, 0) != 0) {
-               txn->abort (txn);
-               return FALSE;
-       }
-
        cur = db->ops_queue->head;
        while (cur) {
                op = cur->data;
-               if (! bdb_process_single_op (db, txn, cursorp, op)) {
-                       txn->abort (txn);
+               if (! bdb_process_single_op (db, NULL, op)) {
                        return FALSE;
                }
                cur = g_list_next (cur);
        }
 
-       /* Commit transaction */
-       cursorp->close (cursorp);
-       if (txn->commit (txn, 0) != 0) {
-               return FALSE;
-       }
-
        /* Clean the queue */
        cur = db->ops_queue->head;
        while (cur) {
                op = cur->data;
-               if (op->op == BDB_OP_DELETE) {
+               if (op->op == BDB_OP_DELETE || (op->elt->flags & KV_ELT_NEED_FREE) != 0) {
                        /* Also clean memory */
                        g_slice_free1 (ELT_SIZE (op->elt), op->elt);
                }
@@ -162,11 +140,8 @@ rspamd_bdb_init (struct rspamd_kv_backend *backend)
        }
 
        flags = DB_INIT_MPOOL |
-                       DB_RECOVER    |
                        DB_CREATE     |    /* Create the environment if it does not already exist. */
-                       DB_INIT_TXN   |    /* Initialize transactions */
                        DB_INIT_LOCK  |    /* Initialize locking. */
-                       DB_INIT_LOG   |    /* Initialize logging */
                        DB_THREAD;         /* Use threads */
 
        if ((ret = db->envp->open (db->envp, db->dirname, flags, 0)) != 0) {
@@ -178,9 +153,9 @@ rspamd_bdb_init (struct rspamd_kv_backend *backend)
         * choose the transaction that has performed the least amount of
         * writing to break the deadlock in the event that one is detected.
         */
-       db->envp->set_lk_detect(db->envp, DB_LOCK_MINWRITE);
+       db->envp->set_lk_detect (db->envp, DB_LOCK_DEFAULT);
 
-       flags = DB_CREATE | DB_THREAD | DB_AUTO_COMMIT;
+       flags = DB_CREATE | DB_THREAD;
        /* Create and open db pointer */
        if ((ret = db_create (&db->dbp, db->envp, 0)) != 0) {
                goto err;
@@ -259,7 +234,6 @@ rspamd_bdb_lookup (struct rspamd_kv_backend *backend, gpointer key)
 {
        struct rspamd_bdb_backend                                       *db = (struct rspamd_bdb_backend *)backend;
        struct bdb_op                                                           *op;
-       DBC                                                                             *cursorp;
        DBT                                                                              db_key, db_data;
        struct rspamd_kv_element                                        *elt = NULL;
 
@@ -275,22 +249,17 @@ rspamd_bdb_lookup (struct rspamd_kv_backend *backend, gpointer key)
                return op->elt;
        }
 
-       /* Now try to search in bdb */
-       if (db->dbp->cursor (db->dbp, NULL, &cursorp, 0) != 0) {
-               return NULL;
-       }
        memset (&db_key, 0, sizeof(DBT));
        memset (&db_data, 0, sizeof(DBT));
        db_key.size = strlen (key);
        db_key.data = key;
        db_data.flags = DB_DBT_MALLOC;
 
-       if (cursorp->get (cursorp, &db_key, &db_data, DB_SET) == 0) {
+       if (db->dbp->get (db->dbp, NULL, &db_key, &db_data, 0) == 0) {
                elt = db_data.data;
                elt->flags &= ~KV_ELT_DIRTY;
        }
 
-       cursorp->close (cursorp);
        return elt;
 }
 
index 0dbad53809b636ed9beabdfbcc1e0974f69f39a8..57a23d6abb17f8f1c78994aeddadb13576c64d5a 100644 (file)
@@ -27,6 +27,9 @@
 #ifdef WITH_DB
 #include "kvstorage_bdb.h"
 #endif
+#ifdef WITH_SQLITE
+#include "kvstorage_sqlite.h"
+#endif
 
 #define LRU_QUEUES 32
 
@@ -93,12 +96,18 @@ kvstorage_init_callback (const gpointer key, const gpointer value, gpointer unus
 
        switch (kconf->backend.type) {
        case KVSTORAGE_TYPE_BACKEND_NULL:
+       case KVSTORAGE_TYPE_BACKEND_MAX:
                backend = NULL;
                break;
 #ifdef WITH_DB
        case KVSTORAGE_TYPE_BACKEND_BDB:
                backend = rspamd_kv_bdb_new (kconf->backend.filename, kconf->backend.sync_ops);
                break;
+#endif
+#ifdef WITH_SQLITE
+       case KVSTORAGE_TYPE_BACKEND_SQLITE:
+               backend = rspamd_kv_sqlite_new (kconf->backend.filename, kconf->backend.sync_ops);
+               break;
 #endif
        }
 
@@ -366,6 +375,11 @@ void kvstorage_xml_text       (GMarkupParseContext         *context,
                else if (g_ascii_strncasecmp (text, "bdb", MIN (text_len, sizeof ("bdb") - 1)) == 0) {
                        kv_parser->current_storage->backend.type = KVSTORAGE_TYPE_BACKEND_BDB;
                }
+#endif
+#ifdef WITH_SQLITE
+               else if (g_ascii_strncasecmp (text, "sqlite", MIN (text_len, sizeof ("sqlite") - 1)) == 0) {
+                       kv_parser->current_storage->backend.type = KVSTORAGE_TYPE_BACKEND_SQLITE;
+               }
 #endif
                else {
                        if (*error == NULL) {
index b6f980b949fb4f32433a73b3103723ef0e77e203..7a3553f009eb608afd2600eeadc3ef802262c6b0 100644 (file)
@@ -38,8 +38,12 @@ enum kvstorage_cache_type {
 enum kvstorage_backend_type {
        KVSTORAGE_TYPE_BACKEND_NULL = 0,
 #ifdef WITH_DB
-       KVSTORAGE_TYPE_BACKEND_BDB
+       KVSTORAGE_TYPE_BACKEND_BDB,
 #endif
+#ifdef WITH_SQLITE
+       KVSTORAGE_TYPE_BACKEND_SQLITE,
+#endif
+       KVSTORAGE_TYPE_BACKEND_MAX = 255
 };
 
 /* Type of kvstorage expire */
diff --git a/src/kvstorage_sqlite.c b/src/kvstorage_sqlite.c
new file mode 100644 (file)
index 0000000..bde3ea1
--- /dev/null
@@ -0,0 +1,429 @@
+/* Copyright (c) 2010, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *       * Redistributions of source code must retain the above copyright
+ *         notice, this list of conditions and the following disclaimer.
+ *       * Redistributions in binary form must reproduce the above copyright
+ *         notice, this list of conditions and the following disclaimer in the
+ *         documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#include "config.h"
+#include "kvstorage.h"
+#include "kvstorage_sqlite.h"
+#include "util.h"
+#include "main.h"
+#include <sqlite3.h>
+
+#define TABLE_NAME "kvstorage"
+#define CREATE_TABLE_SQL "CREATE TABLE " TABLE_NAME " (key TEXT CONSTRAINT _key PRIMARY KEY, data BLOB)"
+#define SET_SQL "INSERT OR REPLACE INTO " TABLE_NAME " (key, data) VALUES (?1, ?2)"
+#define GET_SQL "SELECT data FROM " TABLE_NAME " WHERE key = ?1"
+#define DELETE_SQL "DELETE FROM " TABLE_NAME " WHERE key = ?1"
+
+struct sqlite_op {
+       struct rspamd_kv_element *elt;
+       enum {
+               SQLITE_OP_INSERT,
+               SQLITE_OP_DELETE,
+               SQLITE_OP_REPLACE
+       } op;
+};
+
+/* Main sqlite structure */
+struct rspamd_sqlite_backend {
+       backend_init init_func;                                         /*< this callback is called on kv storage initialization */
+       backend_insert insert_func;                                     /*< this callback is called when element is inserted */
+       backend_replace replace_func;                           /*< this callback is called when element is replaced */
+       backend_lookup lookup_func;                                     /*< this callback is used for lookup of element */
+       backend_delete delete_func;                                     /*< this callback is called when an element is deleted */
+       backend_destroy destroy_func;                           /*< this callback is used for destroying all elements inside backend */
+       sqlite3 *dbp;
+       gchar *filename;
+       gchar *dirname;
+       guint sync_ops;
+       GQueue *ops_queue;
+       GHashTable *ops_hash;
+       gboolean initialized;
+       sqlite3_stmt *get_stmt;
+       sqlite3_stmt *set_stmt;
+       sqlite3_stmt *delete_stmt;
+};
+
+/* Process single sqlite operation */
+static gboolean
+sqlite_process_single_op (struct rspamd_sqlite_backend *db, struct sqlite_op *op)
+{
+       gboolean                                                                         res = FALSE;
+
+       op->elt->flags &= ~KV_ELT_DIRTY;
+       switch (op->op) {
+       case SQLITE_OP_INSERT:
+       case SQLITE_OP_REPLACE:
+               if (sqlite3_bind_text (db->set_stmt, 1, ELT_KEY (op->elt), op->elt->keylen, SQLITE_STATIC) == SQLITE_OK &&
+                               sqlite3_bind_blob (db->set_stmt, 2, op->elt, ELT_SIZE (op->elt), SQLITE_STATIC) == SQLITE_OK) {
+                       if (sqlite3_step (db->set_stmt) == SQLITE_DONE) {
+                               res = TRUE;
+                       }
+               }
+               sqlite3_reset (db->set_stmt);
+               break;
+       case SQLITE_OP_DELETE:
+               if (sqlite3_bind_text (db->delete_stmt, 1, ELT_KEY (op->elt), op->elt->keylen, SQLITE_STATIC) == SQLITE_OK) {
+                       if (sqlite3_step (db->delete_stmt) == SQLITE_DONE) {
+                               res = TRUE;
+                       }
+               }
+               sqlite3_reset (db->delete_stmt);
+               break;
+       }
+
+       if (!res) {
+               op->elt->flags |= KV_ELT_DIRTY;
+       }
+       return res;
+}
+
+/* Process operations queue */
+static gboolean
+sqlite_process_queue (struct rspamd_sqlite_backend *db)
+{
+       struct sqlite_op                                                        *op;
+       GList                                                                           *cur;
+
+       cur = db->ops_queue->head;
+       while (cur) {
+               op = cur->data;
+               if (! sqlite_process_single_op (db, op)) {
+                       return FALSE;
+               }
+               cur = g_list_next (cur);
+       }
+
+       /* Clean the queue */
+       cur = db->ops_queue->head;
+       while (cur) {
+               op = cur->data;
+               if (op->op == SQLITE_OP_DELETE || (op->elt->flags & KV_ELT_NEED_FREE) != 0) {
+                       /* Also clean memory */
+                       g_slice_free1 (ELT_SIZE (op->elt), op->elt);
+               }
+               g_slice_free1 (sizeof (struct sqlite_op), op);
+               cur = g_list_next (cur);
+       }
+
+       g_hash_table_remove_all (db->ops_hash);
+       g_queue_clear (db->ops_queue);
+
+       return TRUE;
+
+}
+
+/* Create table for kvstorage */
+static gboolean
+rspamd_sqlite_create_table (struct rspamd_sqlite_backend *db)
+{
+       gint                                                                             ret;
+       sqlite3_stmt                                                            *stmt = NULL;
+
+       ret = sqlite3_prepare_v2 (db->dbp, CREATE_TABLE_SQL, sizeof (CREATE_TABLE_SQL) - 1, &stmt, NULL);
+       if (ret != SQLITE_OK) {
+               if (stmt != NULL) {
+                       sqlite3_finalize (stmt);
+               }
+               return FALSE;
+       }
+
+       ret = sqlite3_step (stmt);
+       if (ret != SQLITE_DONE) {
+               sqlite3_finalize (stmt);
+               return FALSE;
+       }
+
+       sqlite3_finalize (stmt);
+       return TRUE;
+}
+
+/* Backend callbacks */
+static void
+rspamd_sqlite_init (struct rspamd_kv_backend *backend)
+{
+       struct rspamd_sqlite_backend                            *db = (struct rspamd_sqlite_backend *)backend;
+       guint32                                                                          flags;
+       gint                                                                             ret, r;
+       gchar                                                                            sqlbuf[BUFSIZ];
+       sqlite3_stmt                                                            *stmt = NULL;
+
+       /* Set multi-threaded mode */
+       if (sqlite3_config (SQLITE_CONFIG_MULTITHREAD) != SQLITE_OK) {
+               goto err;
+       }
+
+       flags = SQLITE_OPEN_READWRITE |
+                       SQLITE_OPEN_CREATE    |
+                       SQLITE_OPEN_NOMUTEX;
+
+       ret = sqlite3_open_v2 (db->filename, &db->dbp, flags, NULL);
+
+       if (ret != 0) {
+               goto err;
+       }
+       /* Now check if we have table */
+       r = rspamd_snprintf (sqlbuf, sizeof (sqlbuf), "SELECT * FROM " TABLE_NAME " LIMIT 1");
+       ret = sqlite3_prepare_v2 (db->dbp, sqlbuf, r, &stmt, NULL);
+
+       if (ret == SQLITE_ERROR) {
+               /* Try to create table */
+               if (!rspamd_sqlite_create_table (db)) {
+                       goto err;
+               }
+       }
+       else if (ret != SQLITE_OK) {
+               goto err;
+       }
+       /* We have table here, perform vacuum */
+       sqlite3_finalize (stmt);
+       r = rspamd_snprintf (sqlbuf, sizeof (sqlbuf), "VACUUM");
+       ret = sqlite3_prepare_v2 (db->dbp, sqlbuf, r, &stmt, NULL);
+       if (ret != SQLITE_OK) {
+               goto err;
+       }
+       /* Perform VACUUM */
+       sqlite3_step (stmt);
+       sqlite3_finalize (stmt);
+
+       /* Prepare required statements */
+       ret = sqlite3_prepare_v2 (db->dbp, GET_SQL, sizeof (GET_SQL) - 1, &db->get_stmt, NULL);
+       if (ret != SQLITE_OK) {
+               goto err;
+       }
+       ret = sqlite3_prepare_v2 (db->dbp, SET_SQL, sizeof (SET_SQL) - 1, &db->set_stmt, NULL);
+       if (ret != SQLITE_OK) {
+               goto err;
+       }
+       ret = sqlite3_prepare_v2 (db->dbp, DELETE_SQL, sizeof (DELETE_SQL) - 1, &db->delete_stmt, NULL);
+       if (ret != SQLITE_OK) {
+               goto err;
+       }
+
+       db->initialized = TRUE;
+
+       return;
+err:
+       if (db->dbp != NULL) {
+               msg_err ("error opening sqlite database: %d", ret);
+       }
+       if (stmt != NULL) {
+               msg_err ("error executing statement: %d", ret);
+               sqlite3_finalize (stmt);
+       }
+       if (db->get_stmt != NULL) {
+               sqlite3_finalize (db->get_stmt);
+       }
+       if (db->set_stmt != NULL) {
+               sqlite3_finalize (db->set_stmt);
+       }
+       if (db->delete_stmt != NULL) {
+               sqlite3_finalize (db->delete_stmt);
+       }
+}
+
+static gboolean
+rspamd_sqlite_insert (struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt)
+{
+       struct rspamd_sqlite_backend                            *db = (struct rspamd_sqlite_backend *)backend;
+       struct sqlite_op                                                        *op;
+
+       if (!db->initialized) {
+               return FALSE;
+       }
+
+       op = g_slice_alloc (sizeof (struct sqlite_op));
+       op->op = SQLITE_OP_INSERT;
+       op->elt = elt;
+       elt->flags |= KV_ELT_DIRTY;
+
+       g_queue_push_head (db->ops_queue, op);
+       g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
+
+       if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+               return sqlite_process_queue (db);
+       }
+
+       return TRUE;
+}
+
+static gboolean
+rspamd_sqlite_replace (struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt)
+{
+       struct rspamd_sqlite_backend                            *db = (struct rspamd_sqlite_backend *)backend;
+       struct sqlite_op                                                        *op;
+
+       if (!db->initialized) {
+               return FALSE;
+       }
+
+       op = g_slice_alloc (sizeof (struct sqlite_op));
+       op->op = SQLITE_OP_REPLACE;
+       op->elt = elt;
+       elt->flags |= KV_ELT_DIRTY;
+
+       g_queue_push_head (db->ops_queue, op);
+       g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
+
+       if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+               return sqlite_process_queue (db);
+       }
+
+       return TRUE;
+}
+
+static struct rspamd_kv_element*
+rspamd_sqlite_lookup (struct rspamd_kv_backend *backend, gpointer key)
+{
+       struct rspamd_sqlite_backend                            *db = (struct rspamd_sqlite_backend *)backend;
+       struct sqlite_op                                                        *op;
+       struct rspamd_kv_element                                        *elt = NULL;
+       gint                                                                             l;
+       gconstpointer                                                            d;
+
+       if (!db->initialized) {
+               return NULL;
+       }
+       /* First search in ops queue */
+       if ((op = g_hash_table_lookup (db->ops_hash, key)) != NULL) {
+               if (op->op == SQLITE_OP_DELETE) {
+                       /* To delete, so assume it as not found */
+                       return NULL;
+               }
+               return op->elt;
+       }
+
+       if (sqlite3_bind_text (db->get_stmt, 1, key, strlen (key), SQLITE_STATIC) == SQLITE_OK) {
+               if (sqlite3_step (db->get_stmt) == SQLITE_ROW) {
+                       l = sqlite3_column_bytes (db->get_stmt, 0);
+                       elt = g_malloc (l);
+                       d = sqlite3_column_blob (db->get_stmt, 0);
+                       /* Make temporary copy */
+                       memcpy (elt, d, l);
+               }
+       }
+
+       sqlite3_reset (db->get_stmt);
+       return elt;
+}
+
+static void
+rspamd_sqlite_delete (struct rspamd_kv_backend *backend, gpointer key)
+{
+       struct rspamd_sqlite_backend                            *db = (struct rspamd_sqlite_backend *)backend;
+       struct sqlite_op                                                        *op;
+       struct rspamd_kv_element                                        *elt;
+
+       if (!db->initialized) {
+               return;
+       }
+
+       if ((op = g_hash_table_lookup (db->ops_hash, key)) != NULL) {
+               op->op = SQLITE_OP_DELETE;
+               return;
+       }
+
+       elt = rspamd_sqlite_lookup (backend, key);
+       if (elt == NULL) {
+               return;
+       }
+       op = g_slice_alloc (sizeof (struct sqlite_op));
+       op->op = SQLITE_OP_DELETE;
+       op->elt = elt;
+       elt->flags |= KV_ELT_DIRTY;
+
+       g_queue_push_head (db->ops_queue, op);
+       g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op);
+
+       if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+               sqlite_process_queue (db);
+       }
+
+       return;
+}
+
+static void
+rspamd_sqlite_destroy (struct rspamd_kv_backend *backend)
+{
+       struct rspamd_sqlite_backend                            *db = (struct rspamd_sqlite_backend *)backend;
+
+       if (db->initialized) {
+               sqlite_process_queue (db);
+               if (db->get_stmt != NULL) {
+                       sqlite3_finalize (db->get_stmt);
+               }
+               if (db->set_stmt != NULL) {
+                       sqlite3_finalize (db->set_stmt);
+               }
+               if (db->delete_stmt != NULL) {
+                       sqlite3_finalize (db->delete_stmt);
+               }
+               sqlite3_close (db->dbp);
+               g_free (db->filename);
+               g_free (db->dirname);
+               g_queue_free (db->ops_queue);
+               g_hash_table_unref (db->ops_hash);
+               g_slice_free1 (sizeof (struct rspamd_sqlite_backend), db);
+       }
+}
+
+/* Create new sqlite backend */
+struct rspamd_kv_backend *
+rspamd_kv_sqlite_new (const gchar *filename, guint sync_ops)
+{
+       struct rspamd_sqlite_backend                            *new;
+       struct stat                                                              st;
+       gchar                                                                           *dirname;
+
+       if (filename == NULL) {
+               return NULL;
+       }
+
+       dirname = g_path_get_dirname (filename);
+       if (dirname == NULL || stat (dirname, &st) == -1 || !S_ISDIR (st.st_mode)) {
+               /* Inaccessible path */
+               if (dirname != NULL) {
+                       g_free (dirname);
+               }
+               msg_err ("invalid file: %s", filename);
+               return NULL;
+       }
+
+       new = g_slice_alloc0 (sizeof (struct rspamd_sqlite_backend));
+       new->dirname = dirname;
+       new->filename = g_strdup (filename);
+       new->sync_ops = sync_ops;
+       new->ops_queue = g_queue_new ();
+       new->ops_hash = g_hash_table_new (rspamd_strcase_hash, rspamd_strcase_equal);
+
+       /* Init callbacks */
+       new->init_func = rspamd_sqlite_init;
+       new->insert_func = rspamd_sqlite_insert;
+       new->lookup_func = rspamd_sqlite_lookup;
+       new->delete_func = rspamd_sqlite_delete;
+       new->replace_func = rspamd_sqlite_replace;
+       new->destroy_func = rspamd_sqlite_destroy;
+
+       return (struct rspamd_kv_backend *)new;
+}
+
diff --git a/src/kvstorage_sqlite.h b/src/kvstorage_sqlite.h
new file mode 100644 (file)
index 0000000..fda9902
--- /dev/null
@@ -0,0 +1,38 @@
+/* Copyright (c) 2010, Vsevolod Stakhov
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *       * Redistributions of source code must retain the above copyright
+ *         notice, this list of conditions and the following disclaimer.
+ *       * Redistributions in binary form must reproduce the above copyright
+ *         notice, this list of conditions and the following disclaimer in the
+ *         documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED ''AS IS'' AND ANY
+ * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL Rambler BE LIABLE FOR ANY
+ * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#ifndef KVSTORAGE_SQLITE_H_
+#define KVSTORAGE_SQLITE_H_
+
+#include "config.h"
+#include "kvstorage.h"
+
+#ifdef WITH_DB
+
+/* Create new bdb backend */
+struct rspamd_kv_backend* rspamd_kv_sqlite_new (const gchar *filename, guint sync_ops);
+
+#endif
+
+#endif /* KVSTORAGE_SQLITE_H_ */