diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-09-02 14:32:58 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-09-02 14:32:58 +0100 |
commit | e8abb0bd7ccd7412402e6ecd44c0232d2d18e2a5 (patch) | |
tree | 3525b81285a2c8b47a9bfc9cc11e994daab5e307 /src/fuzzy_storage.c | |
parent | b493b3628def373e40dcdedb21869dc566b3aea1 (diff) | |
download | rspamd-e8abb0bd7ccd7412402e6ecd44c0232d2d18e2a5.tar.gz rspamd-e8abb0bd7ccd7412402e6ecd44c0232d2d18e2a5.zip |
[Rework] Adopt fuzzy storage for abstract backend
Diffstat (limited to 'src/fuzzy_storage.c')
-rw-r--r-- | src/fuzzy_storage.c | 470 |
1 files changed, 259 insertions, 211 deletions
diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index 0d29cd197..d663a89e4 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -36,15 +36,11 @@ #include "libutil/http_private.h" #include "unix-std.h" -/* This number is used as expire time in seconds for cache items (2 days) */ -#define DEFAULT_EXPIRE 172800L /* Resync value in seconds */ #define DEFAULT_SYNC_TIMEOUT 60.0 #define DEFAULT_KEYPAIR_CACHE_SIZE 512 #define DEFAULT_MASTER_TIMEOUT 10.0 -#define INVALID_NODE_TIME (guint64) - 1 - static const gchar *local_db_name = "local"; #define msg_err_fuzzy_update(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ @@ -77,10 +73,6 @@ worker_t fuzzy_worker = { RSPAMD_WORKER_VER /* Version info */ }; -/* For evtimer */ -static struct timeval tmv; -static struct event tev; - struct fuzzy_global_stat { guint64 fuzzy_hashes; /**< number of fuzzy hashes stored */ @@ -139,7 +131,7 @@ struct rspamd_fuzzy_storage_ctx { gboolean encrypted_only; struct rspamd_keypair_cache *keypair_cache; rspamd_lru_hash_t *errors_ips; - struct rspamd_fuzzy_backend_sqlite *backend; + struct rspamd_fuzzy_backend *backend; GQueue *updates_pending; struct rspamd_dns_resolver *resolver; struct rspamd_config *cfg; @@ -165,6 +157,7 @@ struct fuzzy_session { } cmd; struct rspamd_fuzzy_encrypted_reply reply; + struct fuzzy_key_stat *ip_stat; enum rspamd_fuzzy_epoch epoch; enum fuzzy_cmd_type cmd_type; @@ -191,7 +184,10 @@ struct fuzzy_master_update_session { const gchar *name; gchar uid[16]; struct rspamd_http_connection *conn; + struct rspamd_http_message *msg; struct rspamd_fuzzy_storage_ctx *ctx; + const gchar *src; + gchar *psrc; rspamd_inet_addr_t *addr; gboolean replied; gint sock; @@ -242,6 +238,14 @@ fuzzy_key_dtor (gpointer p) g_slice_free1 (sizeof (*key), key); } +static void +fuzzy_count_callback (guint64 count, void *ud) +{ + struct rspamd_fuzzy_storage_ctx *ctx = ud; + + ctx->stat.fuzzy_hashes = count; +} + struct fuzzy_slave_connection { struct rspamd_cryptobox_keypair *local_key; struct rspamd_cryptobox_pubkey *remote_key; @@ -266,19 +270,34 @@ fuzzy_mirror_close_connection (struct fuzzy_slave_connection *conn) } } +struct rspamd_fuzzy_updates_cbdata { + struct rspamd_fuzzy_storage_ctx *ctx; + struct rspamd_http_message *msg; + struct fuzzy_slave_connection *conn; + struct rspamd_fuzzy_mirror *m; +}; + static void -fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx, - struct rspamd_http_message *msg) +fuzzy_mirror_updates_version_cb (guint64 rev64, void *ud) { + struct rspamd_fuzzy_updates_cbdata *cbdata = ud; GList *cur; struct fuzzy_peer_cmd *io_cmd; - guint32 len; - guint32 rev; + guint32 rev32 = rev64, len; const gchar *p; rspamd_fstring_t *reply; + struct fuzzy_slave_connection *conn; + struct rspamd_fuzzy_storage_ctx *ctx; + struct rspamd_http_message *msg; + struct rspamd_fuzzy_mirror *m; + struct timeval tv; - rev = rspamd_fuzzy_backend_sqlite_version (ctx->backend, local_db_name); - rev = GUINT32_TO_LE (rev); + conn = cbdata->conn; + ctx = cbdata->ctx; + msg = cbdata->msg; + m = cbdata->m; + g_slice_free1 (sizeof (*cbdata), cbdata); + rev32 = GUINT32_TO_LE (rev32); len = sizeof (guint32) * 2; /* revision + last chunk */ for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) { @@ -295,8 +314,8 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx, } reply = rspamd_fstring_sized_new (len); - reply = rspamd_fstring_append (reply, (const char *)&rev, - sizeof (rev)); + reply = rspamd_fstring_append (reply, (const char *)&rev32, + sizeof (rev32)); for (cur = ctx->updates_pending->head; cur != NULL; cur = g_list_next (cur)) { io_cmd = cur->data; @@ -320,6 +339,30 @@ fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_storage_ctx *ctx, len = 0; reply = rspamd_fstring_append (reply, (const char *)&len, sizeof (len)); rspamd_http_message_set_body_from_fstring_steal (msg, reply); + double_to_tv (ctx->sync_timeout, &tv); + rspamd_http_connection_write_message (conn->http_conn, + msg, NULL, NULL, conn, + conn->sock, + &tv, ctx->ev_base); + msg_info ("send update request to %s", m->name); +} + +static void +fuzzy_mirror_updates_to_http (struct rspamd_fuzzy_mirror *m, + struct fuzzy_slave_connection *conn, + struct rspamd_fuzzy_storage_ctx *ctx, + struct rspamd_http_message *msg) +{ + + struct rspamd_fuzzy_updates_cbdata *cbdata; + + cbdata = g_slice_alloc (sizeof (*cbdata)); + cbdata->ctx = ctx; + cbdata->msg = msg; + cbdata->conn = conn; + cbdata->m = m; + rspamd_fuzzy_backend_version (ctx->backend, local_db_name, + fuzzy_mirror_updates_version_cb, cbdata); } static void @@ -353,7 +396,6 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx, { struct fuzzy_slave_connection *conn; struct rspamd_http_message *msg; - struct timeval tv; conn = g_slice_alloc0 (sizeof (*conn)); conn->up = rspamd_upstream_get (m->u, @@ -389,84 +431,85 @@ rspamd_fuzzy_send_update_mirror (struct rspamd_fuzzy_storage_ctx *ctx, rspamd_http_connection_set_key (conn->http_conn, ctx->sync_keypair); msg->peer_key = rspamd_pubkey_ref (m->key); - double_to_tv (ctx->sync_timeout, &tv); - fuzzy_mirror_updates_to_http (ctx, msg); + fuzzy_mirror_updates_to_http (m, conn, ctx, msg); +} - rspamd_http_connection_write_message (conn->http_conn, - msg, NULL, NULL, conn, - conn->sock, - &tv, ctx->ev_base); - msg_info ("send update request to %s", m->name); +struct rspamd_updates_cbdata { + struct rspamd_fuzzy_storage_ctx *ctx; + const gchar *source; +}; + +static void +fuzzy_update_version_callback (guint64 ver, void *ud) +{ + msg_info ("updated fuzzy storage from %s: version: %d", + (const char *)ud, (gint)ver); } static void -rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx, - const gchar *source) +rspamd_fuzzy_updates_cb (gboolean success, void *ud) { - GList *cur; - struct fuzzy_peer_cmd *io_cmd; - struct rspamd_fuzzy_cmd *cmd; - gpointer ptr; + struct rspamd_updates_cbdata *cbdata = ud; struct rspamd_fuzzy_mirror *m; guint nupdates = 0, i; + struct rspamd_fuzzy_storage_ctx *ctx; + const gchar *source; + GList *cur; + struct fuzzy_peer_cmd *io_cmd; - if (ctx->updates_pending && - g_queue_get_length (ctx->updates_pending) > 0 && - rspamd_fuzzy_backend_sqlite_prepare_update (ctx->backend, source)) { - cur = ctx->updates_pending->head; + ctx = cbdata->ctx; + source = cbdata->source; - while (cur) { - io_cmd = cur->data; + if (success) { + rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx); - if (io_cmd->is_shingle) { - cmd = &io_cmd->cmd.shingle.basic; - ptr = &io_cmd->cmd.shingle; - } - else { - cmd = &io_cmd->cmd.normal; - ptr = &io_cmd->cmd.normal; - } + if (nupdates > 0) { + for (i = 0; i < ctx->mirrors->len; i ++) { + m = g_ptr_array_index (ctx->mirrors, i); - if (cmd->cmd == FUZZY_WRITE) { - rspamd_fuzzy_backend_sqlite_add (ctx->backend, ptr); - } - else { - rspamd_fuzzy_backend_sqlite_del (ctx->backend, ptr); + rspamd_fuzzy_send_update_mirror (ctx, m); } + } + + /* Clear updates */ + cur = ctx->updates_pending->head; - nupdates++; + while (cur) { + io_cmd = cur->data; + g_slice_free1 (sizeof (*io_cmd), io_cmd); cur = g_list_next (cur); } - if (rspamd_fuzzy_backend_sqlite_finish_update (ctx->backend, source, nupdates > 0)) { - ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_sqlite_count (ctx->backend); + g_queue_clear (ctx->updates_pending); + rspamd_fuzzy_backend_version (ctx->backend, source, + fuzzy_update_version_callback, (void *)source); - if (nupdates > 0) { - for (i = 0; i < ctx->mirrors->len; i ++) { - m = g_ptr_array_index (ctx->mirrors, i); + } + else { + msg_err ("cannot commit update transaction to fuzzy backend, " + "%ud updates are still pending", + g_queue_get_length (ctx->updates_pending)); + } - rspamd_fuzzy_send_update_mirror (ctx, m); - } - } + g_slice_free1 (sizeof (*cbdata), cbdata); +} - /* Clear updates */ - cur = ctx->updates_pending->head; +static void +rspamd_fuzzy_process_updates_queue (struct rspamd_fuzzy_storage_ctx *ctx, + const gchar *source) +{ + + struct rspamd_updates_cbdata *cbdata; + + if (ctx->updates_pending && + g_queue_get_length (ctx->updates_pending) > 0) { + cbdata = g_slice_alloc (sizeof (*cbdata)); + cbdata->ctx = ctx; + cbdata->source = source; + rspamd_fuzzy_backend_process_updates (ctx->backend, ctx->updates_pending, + source, rspamd_fuzzy_updates_cb, cbdata); - while (cur) { - io_cmd = cur->data; - g_slice_free1 (sizeof (*io_cmd), io_cmd); - cur = g_list_next (cur); - } - g_queue_clear (ctx->updates_pending); - msg_info ("updated fuzzy storage: %ud updates processed, version: %d", - nupdates, rspamd_fuzzy_backend_sqlite_version (ctx->backend, source)); - } - else { - msg_err ("cannot commit update transaction to fuzzy backend, " - "%ud updates are still pending", - g_queue_get_length (ctx->updates_pending)); - } } else if (ctx->updates_pending && g_queue_get_length (ctx->updates_pending) > 0) { @@ -599,6 +642,77 @@ rspamd_fuzzy_update_stats (struct rspamd_fuzzy_storage_ctx *ctx, } static void +rspamd_fuzzy_make_reply (struct rspamd_fuzzy_cmd *cmd, + struct rspamd_fuzzy_reply *result, + struct fuzzy_session *session, + gboolean encrypted, gboolean is_shingle) +{ + if (cmd) { + result->tag = cmd->tag; + + memcpy (&session->reply.rep, result, sizeof (*result)); + + rspamd_fuzzy_update_stats (session->ctx, + session->epoch, + result->prob > 0.5, + is_shingle, + session->key_stat, + session->ip_stat, + cmd->cmd, + result->value); + + if (encrypted) { + /* We need also to encrypt reply */ + ottery_rand_bytes (session->reply.hdr.nonce, + sizeof (session->reply.hdr.nonce)); + rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep, + sizeof (session->reply.rep), + session->reply.hdr.nonce, + session->nm, + session->reply.hdr.mac, + RSPAMD_CRYPTOBOX_MODE_25519); + } + } + + rspamd_fuzzy_write_reply (session); +} + +static void +rspamd_fuzzy_check_callback (struct rspamd_fuzzy_reply *result, void *ud) +{ + struct fuzzy_session *session = ud; + gboolean encrypted = FALSE, is_shingle = FALSE; + struct rspamd_fuzzy_cmd *cmd = NULL; + gsize up_len = 0; + + switch (session->cmd_type) { + case CMD_NORMAL: + cmd = &session->cmd.normal; + up_len = sizeof (session->cmd.normal); + break; + case CMD_SHINGLE: + cmd = &session->cmd.shingle.basic; + up_len = sizeof (session->cmd.shingle); + is_shingle = TRUE; + break; + case CMD_ENCRYPTED_NORMAL: + cmd = &session->cmd.enc_normal.cmd; + up_len = sizeof (session->cmd.normal); + encrypted = TRUE; + break; + case CMD_ENCRYPTED_SHINGLE: + cmd = &session->cmd.enc_shingle.cmd.basic; + up_len = sizeof (session->cmd.shingle); + encrypted = TRUE; + is_shingle = TRUE; + break; + } + + rspamd_fuzzy_make_reply (cmd, result, session, encrypted, is_shingle); + REF_RELEASE (session); +} + +static void rspamd_fuzzy_process_command (struct fuzzy_session *session) { gboolean encrypted = FALSE, is_shingle = FALSE; @@ -637,14 +751,16 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) if (G_UNLIKELY (cmd == NULL || up_len == 0)) { result.value = 500; result.prob = 0.0; - goto reply; + rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); + return; } if (session->ctx->encrypted_only && !encrypted) { /* Do not accept unencrypted commands */ result.value = 403; result.prob = 0.0; - goto reply; + rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); + return; } if (session->key_stat) { @@ -657,17 +773,21 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) rspamd_lru_hash_insert (session->key_stat->last_ips, naddr, ip_stat, -1, 0); } + + session->ip_stat = ip_stat; } result.flag = cmd->flag; if (cmd->cmd == FUZZY_CHECK) { - result = rspamd_fuzzy_backend_sqlite_check (session->ctx->backend, cmd, - session->ctx->expire); + REF_RETAIN (session); + rspamd_fuzzy_backend_check (session->ctx->backend, cmd, + rspamd_fuzzy_check_callback, session); } else if (cmd->cmd == FUZZY_STAT) { result.prob = 1.0; result.value = 0; - result.flag = rspamd_fuzzy_backend_sqlite_count (session->ctx->backend); + result.flag = session->ctx->stat.fuzzy_hashes; + rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); } else { if (rspamd_fuzzy_check_client (session)) { @@ -703,36 +823,9 @@ rspamd_fuzzy_process_command (struct fuzzy_session *session) result.value = 403; result.prob = 0.0; } - } - -reply: - if (cmd) { - result.tag = cmd->tag; - memcpy (&session->reply.rep, &result, sizeof (result)); - - rspamd_fuzzy_update_stats (session->ctx, - session->epoch, - result.prob > 0.5, - is_shingle, - session->key_stat, - ip_stat, cmd->cmd, - result.value); - - if (encrypted) { - /* We need also to encrypt reply */ - ottery_rand_bytes (session->reply.hdr.nonce, - sizeof (session->reply.hdr.nonce)); - rspamd_cryptobox_encrypt_nm_inplace ((guchar *)&session->reply.rep, - sizeof (session->reply.rep), - session->reply.hdr.nonce, - session->nm, - session->reply.hdr.mac, - RSPAMD_CRYPTOBOX_MODE_25519); - } + rspamd_fuzzy_make_reply (cmd, &result, session, encrypted, is_shingle); } - - rspamd_fuzzy_write_reply (session); } @@ -916,12 +1009,11 @@ rspamd_fuzzy_cmd_from_wire (guchar *buf, guint buflen, struct fuzzy_session *s) static void rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, - struct rspamd_http_message *msg) + struct rspamd_http_message *msg, guint our_rev) { const guchar *p; - gchar *src = NULL, *psrc; gsize remain; - gint32 revision, our_rev; + gint32 revision; guint32 len = 0, cnt = 0; struct fuzzy_peer_cmd cmd, *pcmd; enum { @@ -932,25 +1024,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, GList *updates = NULL, *cur; gpointer flag_ptr; - if (!rspamd_http_message_get_body (msg, NULL) || !msg->url - || msg->url->len == 0) { - msg_err_fuzzy_update ("empty update message, not processing"); - - return; - } - - /* Detect source from url: /update_v1/<source>, so we look for the last '/' */ - remain = msg->url->len; - psrc = rspamd_fstringdup (msg->url); - src = psrc; - - while (remain--) { - if (src[remain] == '/') { - src = &src[remain + 1]; - break; - } - } - /* * Message format: * <uint32_le> - revision @@ -965,13 +1038,11 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, if (remain > sizeof (gint32) * 2) { memcpy (&revision, p, sizeof (gint32)); revision = GINT32_TO_LE (revision); - our_rev = rspamd_fuzzy_backend_sqlite_version (session->ctx->backend, src); if (revision <= our_rev) { msg_err_fuzzy_update ("remote revision: %d is older than ours: %d, " "refusing update", revision, our_rev); - g_free (psrc); return; } @@ -1076,7 +1147,7 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, cur->data = NULL; } - rspamd_fuzzy_process_updates_queue (session->ctx, src); + rspamd_fuzzy_process_updates_queue (session->ctx, session->src); msg_info_fuzzy_update ("processed updates from the master %s, " "%ud operations processed," " revision: %d (local revision: %d)", @@ -1084,8 +1155,6 @@ rspamd_fuzzy_mirror_process_update (struct fuzzy_master_update_session *session, cnt, revision, our_rev); err: - g_free (psrc); - if (updates) { /* We still need to clear queue */ for (cur = updates; cur != NULL; cur = g_list_next (cur)) { @@ -1119,6 +1188,10 @@ rspamd_fuzzy_mirror_session_destroy (struct fuzzy_master_update_session *session rspamd_http_connection_unref (session->conn); rspamd_inet_address_destroy (session->addr); close (session->sock); + + if (session->psrc) { + g_free (session->psrc); + } g_slice_free1 (sizeof (*session), session); } } @@ -1151,6 +1224,15 @@ rspamd_fuzzy_mirror_send_reply (struct fuzzy_master_update_session *session, session->ctx->ev_base); } +static void +rspamd_fuzzy_update_version_callback (guint64 version, void *ud) +{ + struct fuzzy_master_update_session *session = ud; + + rspamd_fuzzy_mirror_process_update (session, session->msg, version); + rspamd_fuzzy_mirror_send_reply (session, 200, "OK"); +} + static gint rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg) @@ -1158,6 +1240,9 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, struct fuzzy_master_update_session *session = conn->ud; const struct rspamd_cryptobox_pubkey *rk; const gchar *err_str = NULL; + gchar *psrc; + const gchar *src = NULL; + gsize remain; if (session->replied) { rspamd_fuzzy_mirror_session_destroy (session); @@ -1189,9 +1274,30 @@ rspamd_fuzzy_mirror_finish_handler (struct rspamd_http_connection *conn, msg_warn_fuzzy_update ("no trusted key specified, accept any update from %s", rspamd_inet_address_to_string (session->addr)); } + if (!rspamd_http_message_get_body (msg, NULL) || !msg->url + || msg->url->len == 0) { + msg_err_fuzzy_update ("empty update message, not processing"); - rspamd_fuzzy_mirror_process_update (session, msg); - rspamd_fuzzy_mirror_send_reply (session, 200, "OK"); + goto end; + } + + /* Detect source from url: /update_v1/<source>, so we look for the last '/' */ + remain = msg->url->len; + psrc = rspamd_fstringdup (msg->url); + src = psrc; + + while (remain--) { + if (src[remain] == '/') { + src = &src[remain + 1]; + break; + } + } + + session->src = src; + session->psrc = psrc; + session->msg = msg; + rspamd_fuzzy_backend_version (session->ctx->backend, src, + rspamd_fuzzy_update_version_callback, session); return 0; } @@ -1352,38 +1458,6 @@ accept_fuzzy_socket (gint fd, short what, void *arg) } } -static void -sync_callback (gint fd, short what, void *arg) -{ - struct rspamd_worker *worker = (struct rspamd_worker *)arg; - struct rspamd_fuzzy_storage_ctx *ctx; - gdouble next_check; - guint64 old_expired, new_expired; - - ctx = worker->ctx; - - if (ctx->backend) { - rspamd_fuzzy_process_updates_queue (ctx, local_db_name); - /* Call backend sync */ - old_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend); - rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE); - new_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend); - - if (old_expired < new_expired) { - ctx->stat.fuzzy_hashes_expired += new_expired - old_expired; - } - } - - /* Timer event */ - event_del (&tev); - evtimer_set (&tev, sync_callback, worker); - event_base_set (ctx->ev_base, &tev); - /* Plan event with jitter */ - next_check = rspamd_time_jitter (ctx->sync_timeout, 0); - double_to_tv (next_check, &tmv); - evtimer_add (&tev, &tmv); -} - static gboolean rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main, struct rspamd_worker *worker, gint fd, @@ -1392,29 +1466,13 @@ rspamd_fuzzy_storage_sync (struct rspamd_main *rspamd_main, gpointer ud) { struct rspamd_fuzzy_storage_ctx *ctx = ud; - gdouble next_check; - guint64 old_expired, new_expired; struct rspamd_control_reply rep; - if (ctx->backend) { - rspamd_fuzzy_process_updates_queue (ctx, local_db_name); - /* Call backend sync */ - old_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend); - rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE); - new_expired = rspamd_fuzzy_backend_sqlite_expired (ctx->backend); - - if (old_expired < new_expired) { - ctx->stat.fuzzy_hashes_expired += new_expired - old_expired; - } - } - rep.reply.fuzzy_sync.status = 0; - /* Handle next timer event */ - event_del (&tev); - next_check = rspamd_time_jitter (ctx->sync_timeout, 0); - double_to_tv (next_check, &tmv); - evtimer_add (&tev, &tmv); + if (ctx->backend && worker->index == 0) { + rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout); + } if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { msg_err ("cannot write reply to the control socket: %s", @@ -1439,14 +1497,14 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main, if (ctx->backend) { /* Close backend and reopen it one more time */ - rspamd_fuzzy_backend_sqlite_close (ctx->backend); + rspamd_fuzzy_backend_close (ctx->backend); } memset (&rep, 0, sizeof (rep)); rep.type = RSPAMD_CONTROL_RELOAD; - if ((ctx->backend = rspamd_fuzzy_backend_sqlite_open (ctx->hashfile, - TRUE, + if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base, + worker->cf->options, &err)) == NULL) { msg_err ("cannot open backend after reload: %e", err); g_error_free (err); @@ -1456,6 +1514,10 @@ rspamd_fuzzy_storage_reload (struct rspamd_main *rspamd_main, rep.reply.reload.status = 0; } + if (ctx->backend && worker->index == 0) { + rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout); + } + if (write (fd, &rep, sizeof (rep)) != sizeof (rep)) { msg_err ("cannot write reply to the control socket: %s", strerror (errno)); @@ -1644,7 +1706,7 @@ rspamd_fuzzy_storage_stat (struct rspamd_main *rspamd_main, rep.reply.fuzzy_stat.status = 0; memcpy (rep.reply.fuzzy_stat.storage_id, - rspamd_fuzzy_sqlite_backend_id (ctx->backend), + rspamd_fuzzy_backend_id (ctx->backend), sizeof (rep.reply.fuzzy_stat.storage_id)); obj = rspamd_fuzzy_stat_to_ucl (ctx, TRUE); @@ -1901,7 +1963,6 @@ init_fuzzy (struct rspamd_config *cfg) ctx->magic = rspamd_fuzzy_storage_magic; ctx->sync_timeout = DEFAULT_SYNC_TIMEOUT; ctx->master_timeout = DEFAULT_MASTER_TIMEOUT; - ctx->expire = DEFAULT_EXPIRE; ctx->keypair_cache_size = DEFAULT_KEYPAIR_CACHE_SIZE; ctx->keys = g_hash_table_new_full (fuzzy_kp_hash, fuzzy_kp_equal, NULL, fuzzy_key_dtor); @@ -2108,7 +2169,6 @@ fuzzy_peer_rep (struct rspamd_worker *worker, GList *cur; struct rspamd_worker_listen_socket *ls; struct event *accept_events; - gdouble next_check; ctx->peer_fd = rep_fd; @@ -2157,14 +2217,6 @@ fuzzy_peer_rep (struct rspamd_worker *worker, event_base_set (ctx->ev_base, &ctx->peer_ev); event_add (&ctx->peer_ev, NULL); ctx->updates_pending = g_queue_new (); - - /* Timer event */ - evtimer_set (&tev, sync_callback, worker); - event_base_set (ctx->ev_base, &tev); - /* Plan event with jitter */ - next_check = rspamd_time_jitter (ctx->sync_timeout, 0); - double_to_tv (next_check, &tmv); - evtimer_add (&tev, &tmv); } } @@ -2188,13 +2240,14 @@ start_fuzzy (struct rspamd_worker *worker) /* * Open DB and perform VACUUM */ - if ((ctx->backend = rspamd_fuzzy_backend_sqlite_open (ctx->hashfile, TRUE, &err)) == NULL) { + if ((ctx->backend = rspamd_fuzzy_backend_create (ctx->ev_base, + worker->cf->options, &err)) == NULL) { msg_err ("cannot open backend: %e", err); g_error_free (err); exit (EXIT_SUCCESS); } - ctx->stat.fuzzy_hashes = rspamd_fuzzy_backend_sqlite_count (ctx->backend); + rspamd_fuzzy_backend_count (ctx->backend, fuzzy_count_callback, ctx); if (ctx->keypair_cache_size > 0) { /* Create keypairs cache */ @@ -2202,7 +2255,7 @@ start_fuzzy (struct rspamd_worker *worker) } if (worker->index == 0) { - rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE); + rspamd_fuzzy_backend_start_expire (ctx->backend, ctx->sync_timeout); } if (ctx->mirrors && ctx->mirrors->len != 0) { @@ -2260,12 +2313,7 @@ start_fuzzy (struct rspamd_worker *worker) event_base_loop (ctx->ev_base, 0); rspamd_worker_block_signals (); - if (worker->index == 0) { - rspamd_fuzzy_process_updates_queue (ctx, local_db_name); - rspamd_fuzzy_backend_sqlite_sync (ctx->backend, ctx->expire, TRUE); - } - - rspamd_fuzzy_backend_sqlite_close (ctx->backend); + rspamd_fuzzy_backend_close (ctx->backend); rspamd_log_close (worker->srv->logger); if (ctx->peer_fd != -1) { |