]> source.dussan.org Git - rspamd.git/commitdiff
* Implement sync command for manual synchronization with backend.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 7 Nov 2011 00:20:42 +0000 (03:20 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 7 Nov 2011 00:20:42 +0000 (03:20 +0300)
src/kvstorage.h
src/kvstorage_bdb.c
src/kvstorage_server.c
src/kvstorage_server.h
src/kvstorage_sqlite.c

index fb584827458dd4c623dc7bf6afc0eb296caf7120..e4ad31cc1aa8d8c56b96dbce8a5cf91260a764d6 100644 (file)
@@ -47,6 +47,7 @@ typedef gboolean (*backend_insert)(struct rspamd_kv_backend *backend, gpointer k
 typedef gboolean (*backend_replace)(struct rspamd_kv_backend *backend, gpointer key, struct rspamd_kv_element *elt);
 typedef struct rspamd_kv_element* (*backend_lookup)(struct rspamd_kv_backend *backend, gpointer key);
 typedef void (*backend_delete)(struct rspamd_kv_backend *backend, gpointer key);
+typedef gboolean (*backend_sync)(struct rspamd_kv_backend *backend);
 typedef void (*backend_destroy)(struct rspamd_kv_backend *backend);
 
 /* Callbacks for expire */
@@ -99,6 +100,7 @@ struct rspamd_kv_backend {
        backend_replace replace_func;                           /*< this callback is called when element is replaced */
        backend_lookup lookup_func;                                     /*< this callback is used for lookup of element */
        backend_delete delete_func;                                     /*< this callback is called when an element is deleted */
+       backend_sync sync_func;                                         /*< this callback is called when backend need to be synced */
        backend_destroy destroy_func;                           /*< this callback is used for destroying all elements inside backend */
 };
 struct rspamd_kv_expire {
index 87c343b223a03b9884f116e4a565db8cce4d2eb1..280cb5ce96b0764ab75da102ca33eb1450e57ffd 100644 (file)
@@ -45,6 +45,7 @@ struct rspamd_bdb_backend {
        backend_replace replace_func;                           /*< this callback is called when element is replaced */
        backend_lookup lookup_func;                                     /*< this callback is used for lookup of element */
        backend_delete delete_func;                                     /*< this callback is called when an element is deleted */
+       backend_sync sync_func;                                         /*< this callback is called when backend need to be synced */
        backend_destroy destroy_func;                           /*< this callback is used for destroying all elements inside backend */
        DB_ENV *envp;                                                           /*< db environment */
        DB *dbp;                                                                        /*< db pointer */
@@ -93,8 +94,9 @@ bdb_process_single_op (struct rspamd_bdb_backend *db, DB_TXN *txn, struct bdb_op
 
 /* Process operations queue */
 static gboolean
-bdb_process_queue (struct rspamd_bdb_backend *db)
+bdb_process_queue (struct rspamd_kv_backend *backend)
 {
+       struct rspamd_bdb_backend                                       *db = (struct rspamd_bdb_backend *)backend;
        struct bdb_op                                                           *op;
        GList                                                                           *cur;
 
@@ -198,7 +200,7 @@ rspamd_bdb_insert (struct rspamd_kv_backend *backend, gpointer key, struct rspam
        g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
 
        if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
-               return bdb_process_queue (db);
+               return bdb_process_queue (backend);
        }
 
        return TRUE;
@@ -223,7 +225,7 @@ rspamd_bdb_replace (struct rspamd_kv_backend *backend, gpointer key, struct rspa
        g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
 
        if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
-               return bdb_process_queue (db);
+               return bdb_process_queue (backend);
        }
 
        return TRUE;
@@ -292,7 +294,7 @@ rspamd_bdb_delete (struct rspamd_kv_backend *backend, gpointer key)
        g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op);
 
        if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
-               bdb_process_queue (db);
+               bdb_process_queue (backend);
        }
 
        return;
@@ -304,7 +306,7 @@ rspamd_bdb_destroy (struct rspamd_kv_backend *backend)
        struct rspamd_bdb_backend                                       *db = (struct rspamd_bdb_backend *)backend;
 
        if (db->initialized) {
-               bdb_process_queue (db);
+               bdb_process_queue (backend);
                if (db->dbp != NULL) {
                        db->dbp->close (db->dbp, 0);
                }
@@ -353,6 +355,7 @@ rspamd_kv_bdb_new (const gchar *filename, guint sync_ops)
        new->lookup_func = rspamd_bdb_lookup;
        new->delete_func = rspamd_bdb_delete;
        new->replace_func = rspamd_bdb_replace;
+       new->sync_func = bdb_process_queue;
        new->destroy_func = rspamd_bdb_destroy;
 
        return (struct rspamd_kv_backend *)new;
index c6190454c3a55cc3579d867b9d3557449dedaa2e..56f25ed976afdb2361259f09c3fd811512f83d76 100644 (file)
@@ -199,16 +199,22 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
                                                }
                                        }
                                        else if (p - c == 4) {
-                                               if (memcmp (c, "quit", 4) == 0) {
+                                               if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
                                                        session->command = KVSTORAGE_CMD_QUIT;
                                                        state = 100;
                                                        continue;
                                                }
+                                               if (g_ascii_strncasecmp (c, "sync", 4) == 0) {
+                                                       session->command = KVSTORAGE_CMD_SYNC;
+                                                       state = 100;
+                                                       continue;
+                                               }
                                        }
                                        else if (p - c == 6) {
-                                               if (memcmp (c, "delete", 6) == 0) {
+                                               if (g_ascii_strncasecmp (c, "delete", 6) == 0) {
                                                        session->command = KVSTORAGE_CMD_DELETE;
                                                }
+
                                                else {
                                                        return FALSE;
                                                }
@@ -445,6 +451,41 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                                        }
                                }
                        }
+                       else if (session->command == KVSTORAGE_CMD_SYNC) {
+                               if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) {
+                                       if (!is_redis) {
+                                               return rspamd_dispatcher_write (session->dispather, ERROR_COMMON,
+                                                               sizeof (ERROR_COMMON) - 1, FALSE, TRUE);
+                                       }
+                                       else {
+                                               return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF,
+                                                               sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE);
+                                       }
+                               }
+                               else {
+                                       if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) {
+                                               if (!is_redis) {
+                                                       return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF,
+                                                                       sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE);
+                                               }
+                                               else {
+                                                       return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+                                                                       sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+                                               }
+                                       }
+                                       else {
+                                               if (!is_redis) {
+                                                       return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF,
+                                                                       sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE);
+                                               }
+                                               else {
+                                                       return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF,
+                                                                       sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE);
+                                               }
+                                       }
+                               }
+                               g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+                       }
                        else if (session->command == KVSTORAGE_CMD_QUIT) {
                                /* Quit session */
                                free_kvstorage_session (session);
index b513d33ab14ced14877eb354eff00d4654ebd7e4..4f9e8c9518cb641465da79dc42a7e6256cc583ed 100644 (file)
@@ -65,6 +65,7 @@ struct kvstorage_session {
                KVSTORAGE_CMD_SET,
                KVSTORAGE_CMD_GET,
                KVSTORAGE_CMD_DELETE,
+               KVSTORAGE_CMD_SYNC,
                KVSTORAGE_CMD_QUIT
        } command;
        guint id;
index bde3ea1c697de7f1b83ed8c419674b9a2f8cf93a..3fa4bab20e25b9f91e1129629b428a79939fcfbd 100644 (file)
@@ -51,6 +51,7 @@ struct rspamd_sqlite_backend {
        backend_replace replace_func;                           /*< this callback is called when element is replaced */
        backend_lookup lookup_func;                                     /*< this callback is used for lookup of element */
        backend_delete delete_func;                                     /*< this callback is called when an element is deleted */
+       backend_sync sync_func;                                         /*< this callback is called when backend need to be synced */
        backend_destroy destroy_func;                           /*< this callback is used for destroying all elements inside backend */
        sqlite3 *dbp;
        gchar *filename;
@@ -100,8 +101,9 @@ sqlite_process_single_op (struct rspamd_sqlite_backend *db, struct sqlite_op *op
 
 /* Process operations queue */
 static gboolean
-sqlite_process_queue (struct rspamd_sqlite_backend *db)
+sqlite_process_queue (struct rspamd_kv_backend *backend)
 {
+       struct rspamd_sqlite_backend                            *db = (struct rspamd_sqlite_backend *)backend;
        struct sqlite_op                                                        *op;
        GList                                                                           *cur;
 
@@ -261,7 +263,7 @@ rspamd_sqlite_insert (struct rspamd_kv_backend *backend, gpointer key, struct rs
        g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
 
        if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
-               return sqlite_process_queue (db);
+               return sqlite_process_queue (backend);
        }
 
        return TRUE;
@@ -286,7 +288,7 @@ rspamd_sqlite_replace (struct rspamd_kv_backend *backend, gpointer key, struct r
        g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
 
        if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
-               return sqlite_process_queue (db);
+               return sqlite_process_queue (backend);
        }
 
        return TRUE;
@@ -356,7 +358,7 @@ rspamd_sqlite_delete (struct rspamd_kv_backend *backend, gpointer key)
        g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op);
 
        if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
-               sqlite_process_queue (db);
+               sqlite_process_queue (backend);
        }
 
        return;
@@ -368,7 +370,7 @@ rspamd_sqlite_destroy (struct rspamd_kv_backend *backend)
        struct rspamd_sqlite_backend                            *db = (struct rspamd_sqlite_backend *)backend;
 
        if (db->initialized) {
-               sqlite_process_queue (db);
+               sqlite_process_queue (backend);
                if (db->get_stmt != NULL) {
                        sqlite3_finalize (db->get_stmt);
                }
@@ -422,6 +424,7 @@ rspamd_kv_sqlite_new (const gchar *filename, guint sync_ops)
        new->lookup_func = rspamd_sqlite_lookup;
        new->delete_func = rspamd_sqlite_delete;
        new->replace_func = rspamd_sqlite_replace;
+       new->sync_func = sqlite_process_queue;
        new->destroy_func = rspamd_sqlite_destroy;
 
        return (struct rspamd_kv_backend *)new;