aboutsummaryrefslogtreecommitdiffstats
path: root/src/fuzzy_storage.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2017-01-26 17:19:08 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2017-01-26 17:19:08 +0000
commit9a6bd708e5d2de1ea31e2b01170fac7e072f6374 (patch)
tree64990380138b08fe1960a404e34f6dc000fed153 /src/fuzzy_storage.c
parent361d7fd5dfd95cf80fd5749a78b63eed77bfe9d2 (diff)
downloadrspamd-9a6bd708e5d2de1ea31e2b01170fac7e072f6374.tar.gz
rspamd-9a6bd708e5d2de1ea31e2b01170fac7e072f6374.zip
[Minor] Use the same protocol as fuzzy replication
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r--src/fuzzy_storage.c75
1 files changed, 72 insertions, 3 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c
index 69c48260d..95bef0e1e 100644
--- a/src/fuzzy_storage.c
+++ b/src/fuzzy_storage.c
@@ -152,12 +152,14 @@ struct rspamd_fuzzy_storage_ctx {
gboolean collection_mode;
struct rspamd_cryptobox_keypair *collection_keypair;
struct rspamd_cryptobox_pubkey *collection_sign_key;
+ gchar *collection_id_file;
struct rspamd_keypair_cache *keypair_cache;
rspamd_lru_hash_t *errors_ips;
struct rspamd_fuzzy_backend *backend;
GQueue *updates_pending;
guint updates_failed;
guint updates_maxfail;
+ guint32 collection_id;
struct rspamd_dns_resolver *resolver;
struct rspamd_config *cfg;
struct rspamd_worker *worker;
@@ -1548,7 +1550,17 @@ rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_
/* Send&Clear updates */
cur = ctx->updates_pending->head;
reply = rspamd_fstring_sized_new (8192);
-
+ /*
+ * Message format:
+ * <uint32_le> - revision
+ * <uint32_le> - size of the next element
+ * <data> - command data
+ * ...
+ * <0> - end of data
+ * ... - ignored
+ */
+ reply = rspamd_fstring_append (reply, (const gchar *)&ctx->collection_id,
+ sizeof (ctx->collection_id));
while (cur) {
io_cmd = cur->data;
@@ -1577,6 +1589,7 @@ rspamd_fuzzy_collection_data_handler (struct rspamd_http_connection_entry *conn_
g_queue_clear (ctx->updates_pending);
/* Clear failed attempts counter */
ctx->updates_failed = 0;
+ ctx->collection_id ++;
rspamd_fuzzy_collection_send_fstring (conn_ent, reply);
return 0;
@@ -2361,6 +2374,7 @@ init_fuzzy (struct rspamd_config *cfg)
rspamd_mempool_add_destructor (cfg->cfg_pool,
(rspamd_mempool_destruct_t)rspamd_ptr_array_free_hard, ctx->mirrors);
ctx->updates_maxfail = DEFAULT_UPDATES_MAXFAIL;
+ ctx->collection_id_file = RSPAMD_DBDIR "/fuzzy_collection.id";
rspamd_rcl_register_worker_option (cfg,
type,
@@ -2552,6 +2566,14 @@ init_fuzzy (struct rspamd_config *cfg)
G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_keypair),
0,
"Use the specified keypair to encrypt collection protocol");
+ rspamd_rcl_register_worker_option (cfg,
+ type,
+ "collection_id_file",
+ rspamd_rcl_parse_struct_string,
+ ctx,
+ G_STRUCT_OFFSET (struct rspamd_fuzzy_storage_ctx, collection_id_file),
+ RSPAMD_CL_FLAG_STRING_PATH,
+ "Store collection epoch in the desired file");
return ctx;
}
@@ -2735,6 +2757,32 @@ start_fuzzy (struct rspamd_worker *worker)
ctx->collection_keypair);
}
+ /* Try to load collection id */
+ if (ctx->collection_id_file) {
+ gint fd;
+
+ fd = rspamd_file_xopen (ctx->collection_id_file, O_RDONLY, 0);
+
+ if (fd == -1) {
+ if (errno != ENOENT) {
+ msg_err ("cannot open collection id from %s: %s",
+ ctx->collection_id_file, strerror (errno));
+ }
+
+ ctx->collection_id = 0;
+ }
+ else {
+ if (read (fd, &ctx->collection_id,
+ sizeof (ctx->collection_id)) == -1) {
+ msg_err ("cannot read collection id from %s: %s",
+ ctx->collection_id_file, strerror (errno));
+ ctx->collection_id = 0;
+ }
+
+ close (fd);
+ }
+ }
+
/* Generate new cookie */
ottery_rand_bytes (ctx->cookie, sizeof (ctx->cookie));
/* Register paths */
@@ -2795,8 +2843,10 @@ start_fuzzy (struct rspamd_worker *worker)
rspamd_worker_block_signals ();
if (worker->index == 0 && g_queue_get_length (ctx->updates_pending) > 0) {
- rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
- event_base_loop (ctx->ev_base, 0);
+ if (!ctx->collection_mode) {
+ rspamd_fuzzy_process_updates_queue (ctx, local_db_name);
+ event_base_loop (ctx->ev_base, 0);
+ }
}
if (!ctx->collection_mode) {
@@ -2804,6 +2854,25 @@ start_fuzzy (struct rspamd_worker *worker)
}
else if (worker->index == 0) {
rspamd_http_router_free (ctx->collection_rt);
+ /* Try to save collection id */
+ gint fd;
+
+ fd = rspamd_file_xopen (ctx->collection_id_file,
+ O_WRONLY | O_CREAT | O_TRUNC, 00644);
+
+ if (fd == -1) {
+ msg_err ("cannot open collection id to store in %s: %s",
+ ctx->collection_id_file, strerror (errno));
+ }
+ else {
+ if (write (fd, &ctx->collection_id,
+ sizeof (ctx->collection_id)) == -1) {
+ msg_err ("cannot store collection id in %s: %s",
+ ctx->collection_id_file, strerror (errno));
+ }
+
+ close (fd);
+ }
}
rspamd_log_close (worker->srv->logger);