]> source.dussan.org Git - rspamd.git/commitdiff
Implement lazy backend writing using sync_ops = 0.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 8 Nov 2011 15:47:37 +0000 (18:47 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Tue, 8 Nov 2011 15:47:37 +0000 (18:47 +0300)
Allways wait for kvstorage worker to prevent data corruption.

src/kvstorage_bdb.c
src/kvstorage_server.c
src/kvstorage_sqlite.c
src/main.c

index 280cb5ce96b0764ab75da102ca33eb1450e57ffd..07fcfd8d880231807a8da2617a3bd0fd7958cc3b 100644 (file)
@@ -199,7 +199,7 @@ rspamd_bdb_insert (struct rspamd_kv_backend *backend, gpointer key, struct rspam
        g_queue_push_head (db->ops_queue, op);
        g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
 
-       if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+       if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
                return bdb_process_queue (backend);
        }
 
@@ -224,7 +224,7 @@ rspamd_bdb_replace (struct rspamd_kv_backend *backend, gpointer key, struct rspa
        g_queue_push_head (db->ops_queue, op);
        g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
 
-       if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+       if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
                return bdb_process_queue (backend);
        }
 
@@ -293,7 +293,7 @@ rspamd_bdb_delete (struct rspamd_kv_backend *backend, gpointer key)
        g_queue_push_head (db->ops_queue, op);
        g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op);
 
-       if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+       if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
                bdb_process_queue (backend);
        }
 
index d91eaa1c815667768bd939b8e8073c76b7d699e7..a5361057fe301df54ccc777caa2a2f3ef7de1545 100644 (file)
@@ -790,6 +790,7 @@ start_kvstorage_worker (struct rspamd_worker *worker)
                }
        }
 
+       msg_info ("syncing storages");
        destroy_kvstorage_config ();
        close_log (rspamd_main->logger);
        exit (EXIT_SUCCESS);
index 3fa4bab20e25b9f91e1129629b428a79939fcfbd..3f4fe697972cf59a1fefe1ff3a39384838fefdcc 100644 (file)
@@ -262,7 +262,7 @@ rspamd_sqlite_insert (struct rspamd_kv_backend *backend, gpointer key, struct rs
        g_queue_push_head (db->ops_queue, op);
        g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
 
-       if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+       if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
                return sqlite_process_queue (backend);
        }
 
@@ -287,7 +287,7 @@ rspamd_sqlite_replace (struct rspamd_kv_backend *backend, gpointer key, struct r
        g_queue_push_head (db->ops_queue, op);
        g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
 
-       if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+       if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
                return sqlite_process_queue (backend);
        }
 
@@ -357,7 +357,7 @@ rspamd_sqlite_delete (struct rspamd_kv_backend *backend, gpointer key)
        g_queue_push_head (db->ops_queue, op);
        g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op);
 
-       if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+       if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
                sqlite_process_queue (backend);
        }
 
index e3e05503d21468f9c0fcb9552da67eefd0e8f0e0..51f0a881f84ae8b19b6627ee58a4ce6547198acc 100644 (file)
@@ -615,9 +615,16 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
 
        if (waitpid (w->pid, &res, 0) == -1) {
                if (errno == EINTR) {
-                       msg_info ("terminate worker %P with SIGKILL", w->pid);
-                       kill (w->pid, SIGKILL);
                        got_alarm = 1;
+                       if (w->type != TYPE_KVSTORAGE) {
+                               msg_info ("terminate worker %P with SIGKILL", w->pid);
+                               kill (w->pid, SIGKILL);
+                       }
+                       else {
+                               msg_info ("waiting for storages to sync");
+                               wait_for_workers (key, value, unused);
+                               return TRUE;
+                       }
                }
        }