Allways wait for kvstorage worker to prevent data corruption.tags/0.4.5
@@ -199,7 +199,7 @@ rspamd_bdb_insert (struct rspamd_kv_backend *backend, gpointer key, struct rspam | |||
g_queue_push_head (db->ops_queue, op); | |||
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); | |||
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { | |||
if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) { | |||
return bdb_process_queue (backend); | |||
} | |||
@@ -224,7 +224,7 @@ rspamd_bdb_replace (struct rspamd_kv_backend *backend, gpointer key, struct rspa | |||
g_queue_push_head (db->ops_queue, op); | |||
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); | |||
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { | |||
if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) { | |||
return bdb_process_queue (backend); | |||
} | |||
@@ -293,7 +293,7 @@ rspamd_bdb_delete (struct rspamd_kv_backend *backend, gpointer key) | |||
g_queue_push_head (db->ops_queue, op); | |||
g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op); | |||
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { | |||
if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) { | |||
bdb_process_queue (backend); | |||
} | |||
@@ -790,6 +790,7 @@ start_kvstorage_worker (struct rspamd_worker *worker) | |||
} | |||
} | |||
msg_info ("syncing storages"); | |||
destroy_kvstorage_config (); | |||
close_log (rspamd_main->logger); | |||
exit (EXIT_SUCCESS); |
@@ -262,7 +262,7 @@ rspamd_sqlite_insert (struct rspamd_kv_backend *backend, gpointer key, struct rs | |||
g_queue_push_head (db->ops_queue, op); | |||
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); | |||
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { | |||
if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) { | |||
return sqlite_process_queue (backend); | |||
} | |||
@@ -287,7 +287,7 @@ rspamd_sqlite_replace (struct rspamd_kv_backend *backend, gpointer key, struct r | |||
g_queue_push_head (db->ops_queue, op); | |||
g_hash_table_insert (db->ops_hash, ELT_KEY (elt), op); | |||
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { | |||
if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) { | |||
return sqlite_process_queue (backend); | |||
} | |||
@@ -357,7 +357,7 @@ rspamd_sqlite_delete (struct rspamd_kv_backend *backend, gpointer key) | |||
g_queue_push_head (db->ops_queue, op); | |||
g_hash_table_insert (db->ops_hash, ELT_KEY(elt), op); | |||
if (g_queue_get_length (db->ops_queue) >= db->sync_ops) { | |||
if (db->sync_ops > 0 && g_queue_get_length (db->ops_queue) >= db->sync_ops) { | |||
sqlite_process_queue (backend); | |||
} | |||
@@ -615,9 +615,16 @@ wait_for_workers (gpointer key, gpointer value, gpointer unused) | |||
if (waitpid (w->pid, &res, 0) == -1) { | |||
if (errno == EINTR) { | |||
msg_info ("terminate worker %P with SIGKILL", w->pid); | |||
kill (w->pid, SIGKILL); | |||
got_alarm = 1; | |||
if (w->type != TYPE_KVSTORAGE) { | |||
msg_info ("terminate worker %P with SIGKILL", w->pid); | |||
kill (w->pid, SIGKILL); | |||
} | |||
else { | |||
msg_info ("waiting for storages to sync"); | |||
wait_for_workers (key, value, unused); | |||
return TRUE; | |||
} | |||
} | |||
} | |||