summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-08 18:47:37 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-08 18:47:37 +0300
commitcf21ad184448908536c32495db26f97bffd3f584 (patch)
treee8f4bf0efbcbbd166f79f26bc6bbc3ff6658028f /src
parentd8f9f8f6c64001dab5f0357b1c6af93bf6c3eea5 (diff)
downloadrspamd-cf21ad184448908536c32495db26f97bffd3f584.tar.gz
rspamd-cf21ad184448908536c32495db26f97bffd3f584.zip
Implement lazy backend writing using sync_ops = 0.
Allways wait for kvstorage worker to prevent data corruption.
Diffstat (limited to 'src')
-rw-r--r--src/kvstorage_bdb.c6
-rw-r--r--src/kvstorage_server.c1
-rw-r--r--src/kvstorage_sqlite.c6
-rw-r--r--src/main.c11
4 files changed, 16 insertions, 8 deletions
diff --git a/src/kvstorage_bdb.c b/src/kvstorage_bdb.c
index 280cb5ce9..07fcfd8d8 100644
--- a/src/kvstorage_bdb.c
+++ b/src/kvstorage_bdb.c
@@ -199,7 +199,7 @@ rspamd_bdb_insert (struct rspamd_kv_backend *backend, gpointer key, struct rspam
g_queue_push_head (db->ops_queue, op);
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
- if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+ if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
return bdb_process_queue (backend);
}
@@ -224,7 +224,7 @@ rspamd_bdb_replace (struct rspamd_kv_backend *backend, gpointer key, struct rspa
g_queue_push_head (db->ops_queue, op);
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
- if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+ if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
return bdb_process_queue (backend);
}
@@ -293,7 +293,7 @@ rspamd_bdb_delete (struct rspamd_kv_backend *backend, gpointer key)
g_queue_push_head (db->ops_queue, op);
g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op);
- if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+ if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
bdb_process_queue (backend);
}
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index d91eaa1c8..a5361057f 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -790,6 +790,7 @@ start_kvstorage_worker (struct rspamd_worker *worker)
}
}
+ msg_info ("syncing storages");
destroy_kvstorage_config ();
close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
diff --git a/src/kvstorage_sqlite.c b/src/kvstorage_sqlite.c
index 3fa4bab20..3f4fe6979 100644
--- a/src/kvstorage_sqlite.c
+++ b/src/kvstorage_sqlite.c
@@ -262,7 +262,7 @@ rspamd_sqlite_insert (struct rspamd_kv_backend *backend, gpointer key, struct rs
g_queue_push_head (db->ops_queue, op);
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
- if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+ if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
return sqlite_process_queue (backend);
}
@@ -287,7 +287,7 @@ rspamd_sqlite_replace (struct rspamd_kv_backend *backend, gpointer key, struct r
g_queue_push_head (db->ops_queue, op);
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op);
- if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+ if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
return sqlite_process_queue (backend);
}
@@ -357,7 +357,7 @@ rspamd_sqlite_delete (struct rspamd_kv_backend *backend, gpointer key)
g_queue_push_head (db->ops_queue, op);
g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op);
- if (g_queue_get_length (db->ops_queue) >= db->sync_ops) {
+ if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) {
sqlite_process_queue (backend);
}
diff --git a/src/main.c b/src/main.c
index e3e05503d..51f0a881f 100644
--- a/src/main.c
+++ b/src/main.c
@@ -615,9 +615,16 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused)
if (waitpid (w->pid, &res, 0) == -1) {
if (errno == EINTR) {
- msg_info ("terminate worker %P with SIGKILL", w->pid);
- kill (w->pid, SIGKILL);
got_alarm = 1;
+ if (w->type != TYPE_KVSTORAGE) {
+ msg_info ("terminate worker %P with SIGKILL", w->pid);
+ kill (w->pid, SIGKILL);
+ }
+ else {
+ msg_info ("waiting for storages to sync");
+ wait_for_workers (key, value, unused);
+ return TRUE;
+ }
}
}