diff options
Diffstat (limited to 'src/libserver/fuzzy_backend')
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend.c | 569 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend.h | 131 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend_redis.c | 1564 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend_redis.h | 67 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c | 1065 | ||||
-rw-r--r-- | src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h | 107 |
6 files changed, 3503 insertions, 0 deletions
diff --git a/src/libserver/fuzzy_backend/fuzzy_backend.c b/src/libserver/fuzzy_backend/fuzzy_backend.c new file mode 100644 index 000000000..f6dec1d6e --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend.c @@ -0,0 +1,569 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "config.h" +#include "fuzzy_backend.h" +#include "fuzzy_backend_sqlite.h" +#include "fuzzy_backend_redis.h" +#include "cfg_file.h" +#include "fuzzy_wire.h" + +#define DEFAULT_EXPIRE 172800L + +enum rspamd_fuzzy_backend_type { + RSPAMD_FUZZY_BACKEND_SQLITE = 0, + RSPAMD_FUZZY_BACKEND_REDIS = 1, +}; + +static void* rspamd_fuzzy_backend_init_sqlite (struct rspamd_fuzzy_backend *bk, + const ucl_object_t *obj, struct rspamd_config *cfg, GError **err); +static void rspamd_fuzzy_backend_check_sqlite (struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud); +static void rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud); +static void rspamd_fuzzy_backend_count_sqlite (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud); +static void rspamd_fuzzy_backend_version_sqlite (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud); +static const gchar* rspamd_fuzzy_backend_id_sqlite (struct rspamd_fuzzy_backend *bk, + void *subr_ud); +static void rspamd_fuzzy_backend_expire_sqlite (struct rspamd_fuzzy_backend *bk, + void *subr_ud); +static void rspamd_fuzzy_backend_close_sqlite (struct rspamd_fuzzy_backend *bk, + void *subr_ud); + +struct rspamd_fuzzy_backend_subr { + void* (*init) (struct rspamd_fuzzy_backend *bk, const ucl_object_t *obj, + struct rspamd_config *cfg, + GError **err); + void (*check) (struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud); + void (*update) (struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud); + void (*count) (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud); + void (*version) (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud); + const gchar* (*id) (struct rspamd_fuzzy_backend *bk, void *subr_ud); + void (*periodic) (struct rspamd_fuzzy_backend *bk, void *subr_ud); + void (*close) (struct rspamd_fuzzy_backend *bk, void *subr_ud); +}; + +static const struct rspamd_fuzzy_backend_subr fuzzy_subrs[] = { + [RSPAMD_FUZZY_BACKEND_SQLITE] = { + .init = rspamd_fuzzy_backend_init_sqlite, + .check = rspamd_fuzzy_backend_check_sqlite, + .update = rspamd_fuzzy_backend_update_sqlite, + .count = rspamd_fuzzy_backend_count_sqlite, + .version = rspamd_fuzzy_backend_version_sqlite, + .id = rspamd_fuzzy_backend_id_sqlite, + .periodic = rspamd_fuzzy_backend_expire_sqlite, + .close = rspamd_fuzzy_backend_close_sqlite, + }, +#ifdef WITH_HIREDIS + [RSPAMD_FUZZY_BACKEND_REDIS] = { + .init = rspamd_fuzzy_backend_init_redis, + .check = rspamd_fuzzy_backend_check_redis, + .update = rspamd_fuzzy_backend_update_redis, + .count = rspamd_fuzzy_backend_count_redis, + .version = rspamd_fuzzy_backend_version_redis, + .id = rspamd_fuzzy_backend_id_redis, + .periodic = rspamd_fuzzy_backend_expire_redis, + .close = rspamd_fuzzy_backend_close_redis, + } +#endif +}; + +struct rspamd_fuzzy_backend { + enum rspamd_fuzzy_backend_type type; + gdouble expire; + gdouble sync; + struct ev_loop *event_loop; + rspamd_fuzzy_periodic_cb periodic_cb; + void *periodic_ud; + const struct rspamd_fuzzy_backend_subr *subr; + void *subr_ud; + ev_timer periodic_event; +}; + +static GQuark +rspamd_fuzzy_backend_quark (void) +{ + return g_quark_from_static_string ("fuzzy-backend"); +} + +static void* +rspamd_fuzzy_backend_init_sqlite (struct rspamd_fuzzy_backend *bk, + const ucl_object_t *obj, struct rspamd_config *cfg, GError **err) +{ + const ucl_object_t *elt; + + elt = ucl_object_lookup_any (obj, "hashfile", "hash_file", "file", + "database", NULL); + + if (elt == NULL || ucl_object_type (elt) != UCL_STRING) { + g_set_error (err, rspamd_fuzzy_backend_quark (), + EINVAL, "missing sqlite3 path"); + return NULL; + } + + return rspamd_fuzzy_backend_sqlite_open (ucl_object_tostring (elt), + FALSE, err); +} + +static void +rspamd_fuzzy_backend_check_sqlite (struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + struct rspamd_fuzzy_reply rep; + + rep = rspamd_fuzzy_backend_sqlite_check (sq, cmd, bk->expire); + + if (cb) { + cb (&rep, ud); + } +} + +static void +rspamd_fuzzy_backend_update_sqlite (struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + gboolean success = FALSE; + guint i; + struct fuzzy_peer_cmd *io_cmd; + struct rspamd_fuzzy_cmd *cmd; + gpointer ptr; + guint nupdates = 0, nadded = 0, ndeleted = 0, nextended = 0, nignored = 0; + + if (rspamd_fuzzy_backend_sqlite_prepare_update (sq, src)) { + for (i = 0; i < updates->len; i ++) { + io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i); + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + ptr = &io_cmd->cmd.shingle; + } + else { + cmd = &io_cmd->cmd.normal; + ptr = &io_cmd->cmd.normal; + } + + if (cmd->cmd == FUZZY_WRITE) { + rspamd_fuzzy_backend_sqlite_add (sq, ptr); + nadded ++; + nupdates ++; + } + else if (cmd->cmd == FUZZY_DEL) { + rspamd_fuzzy_backend_sqlite_del (sq, ptr); + ndeleted ++; + nupdates ++; + } + else { + if (cmd->cmd == FUZZY_REFRESH) { + nextended ++; + } + else { + nignored ++; + } + } + } + + if (rspamd_fuzzy_backend_sqlite_finish_update (sq, src, + nupdates > 0)) { + success = TRUE; + } + } + + if (cb) { + cb (success, nadded, ndeleted, nextended, nignored, ud); + } +} + +static void +rspamd_fuzzy_backend_count_sqlite (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + guint64 nhashes; + + nhashes = rspamd_fuzzy_backend_sqlite_count (sq); + + if (cb) { + cb (nhashes, ud); + } +} + +static void +rspamd_fuzzy_backend_version_sqlite (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + guint64 rev; + + rev = rspamd_fuzzy_backend_sqlite_version (sq, src); + + if (cb) { + cb (rev, ud); + } +} + +static const gchar* +rspamd_fuzzy_backend_id_sqlite (struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + + return rspamd_fuzzy_sqlite_backend_id (sq); +} +static void +rspamd_fuzzy_backend_expire_sqlite (struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + + rspamd_fuzzy_backend_sqlite_sync (sq, bk->expire, TRUE); +} + +static void +rspamd_fuzzy_backend_close_sqlite (struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_sqlite *sq = subr_ud; + + rspamd_fuzzy_backend_sqlite_close (sq); +} + + +struct rspamd_fuzzy_backend * +rspamd_fuzzy_backend_create (struct ev_loop *ev_base, + const ucl_object_t *config, + struct rspamd_config *cfg, + GError **err) +{ + struct rspamd_fuzzy_backend *bk; + enum rspamd_fuzzy_backend_type type = RSPAMD_FUZZY_BACKEND_SQLITE; + const ucl_object_t *elt; + gdouble expire = DEFAULT_EXPIRE; + + if (config != NULL) { + elt = ucl_object_lookup (config, "backend"); + + if (elt != NULL && ucl_object_type (elt) == UCL_STRING) { + if (strcmp (ucl_object_tostring (elt), "sqlite") == 0) { + type = RSPAMD_FUZZY_BACKEND_SQLITE; + } + else if (strcmp (ucl_object_tostring (elt), "redis") == 0) { + type = RSPAMD_FUZZY_BACKEND_REDIS; + } + else { + g_set_error (err, rspamd_fuzzy_backend_quark (), + EINVAL, "invalid backend type: %s", + ucl_object_tostring (elt)); + return NULL; + } + } + + elt = ucl_object_lookup (config, "expire"); + + if (elt != NULL) { + expire = ucl_object_todouble (elt); + } + } + + bk = g_malloc0 (sizeof (*bk)); + bk->event_loop = ev_base; + bk->expire = expire; + bk->type = type; + bk->subr = &fuzzy_subrs[type]; + + if ((bk->subr_ud = bk->subr->init (bk, config, cfg, err)) == NULL) { + g_free (bk); + + return NULL; + } + + return bk; +} + + +void +rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud) +{ + g_assert (bk != NULL); + + bk->subr->check (bk, cmd, cb, ud, bk->subr_ud); +} + +static guint +rspamd_fuzzy_digest_hash (gconstpointer key) +{ + guint ret; + + /* Distirbuted uniformly already */ + memcpy (&ret, key, sizeof (ret)); + + return ret; +} + +static gboolean +rspamd_fuzzy_digest_equal (gconstpointer v, gconstpointer v2) +{ + return memcmp (v, v2, rspamd_cryptobox_HASHBYTES) == 0; +} + +static void +rspamd_fuzzy_backend_deduplicate_queue (GArray *updates) +{ + GHashTable *seen = g_hash_table_new (rspamd_fuzzy_digest_hash, + rspamd_fuzzy_digest_equal); + struct fuzzy_peer_cmd *io_cmd, *found; + struct rspamd_fuzzy_cmd *cmd; + guchar *digest; + guint i; + + for (i = 0; i < updates->len; i ++) { + io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i); + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + } + else { + cmd = &io_cmd->cmd.normal; + } + + digest = cmd->digest; + + found = g_hash_table_lookup (seen, digest); + + if (found == NULL) { + /* Add to the seen list, if not a duplicate (huh?) */ + if (cmd->cmd != FUZZY_DUP) { + g_hash_table_insert (seen, digest, io_cmd); + } + } + else { + if (found->cmd.normal.flag != cmd->flag) { + /* TODO: deal with flags better at some point */ + continue; + } + + /* Apply heuristic */ + switch (cmd->cmd) { + case FUZZY_WRITE: + if (found->cmd.normal.cmd == FUZZY_WRITE) { + /* Already seen */ + found->cmd.normal.value += cmd->value; + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + else if (found->cmd.normal.cmd == FUZZY_REFRESH) { + /* Seen refresh command, remove it as write has higher priority */ + g_hash_table_replace (seen, digest, io_cmd); + found->cmd.normal.cmd = FUZZY_DUP; + } + else if (found->cmd.normal.cmd == FUZZY_DEL) { + /* Request delete + add, weird, but ignore add */ + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + break; + case FUZZY_REFRESH: + if (found->cmd.normal.cmd == FUZZY_WRITE) { + /* No need to expire, handled by addition */ + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + else if (found->cmd.normal.cmd == FUZZY_DEL) { + /* Request delete + expire, ignore expire */ + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + else if (found->cmd.normal.cmd == FUZZY_REFRESH) { + /* Already handled */ + cmd->cmd = FUZZY_DUP; /* Ignore this one */ + } + break; + case FUZZY_DEL: + /* Delete has priority over all other commands */ + g_hash_table_replace (seen, digest, io_cmd); + found->cmd.normal.cmd = FUZZY_DUP; + break; + default: + break; + } + } + } + + g_hash_table_unref (seen); +} + +void +rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb, + void *ud) +{ + g_assert (bk != NULL); + g_assert (updates != NULL); + + if (updates) { + rspamd_fuzzy_backend_deduplicate_queue (updates); + bk->subr->update (bk, updates, src, cb, ud, bk->subr_ud); + } + else if (cb) { + cb (TRUE, 0, 0, 0, 0, ud); + } +} + + +void +rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud) +{ + g_assert (bk != NULL); + + bk->subr->count (bk, cb, ud, bk->subr_ud); +} + + +void +rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud) +{ + g_assert (bk != NULL); + + bk->subr->version (bk, src, cb, ud, bk->subr_ud); +} + +const gchar * +rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *bk) +{ + g_assert (bk != NULL); + + if (bk->subr->id) { + return bk->subr->id (bk, bk->subr_ud); + } + + return NULL; +} + +static inline void +rspamd_fuzzy_backend_periodic_sync (struct rspamd_fuzzy_backend *bk) +{ + if (bk->periodic_cb) { + if (bk->periodic_cb (bk->periodic_ud)) { + if (bk->subr->periodic) { + bk->subr->periodic (bk, bk->subr_ud); + } + } + } + else { + if (bk->subr->periodic) { + bk->subr->periodic (bk, bk->subr_ud); + } + } +} + +static void +rspamd_fuzzy_backend_periodic_cb (EV_P_ ev_timer *w, int revents) +{ + struct rspamd_fuzzy_backend *bk = (struct rspamd_fuzzy_backend *)w->data; + gdouble jittered; + + jittered = rspamd_time_jitter (bk->sync, bk->sync / 2.0); + w->repeat = jittered; + rspamd_fuzzy_backend_periodic_sync (bk); + ev_timer_again (EV_A_ w); +} + +void +rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *bk, + gdouble timeout, + rspamd_fuzzy_periodic_cb cb, + void *ud) +{ + gdouble jittered; + + g_assert (bk != NULL); + + if (bk->subr->periodic) { + if (bk->sync > 0.0) { + ev_timer_stop (bk->event_loop, &bk->periodic_event); + } + + if (cb) { + bk->periodic_cb = cb; + bk->periodic_ud = ud; + } + + rspamd_fuzzy_backend_periodic_sync (bk); + bk->sync = timeout; + jittered = rspamd_time_jitter (timeout, timeout / 2.0); + + bk->periodic_event.data = bk; + ev_timer_init (&bk->periodic_event, rspamd_fuzzy_backend_periodic_cb, + jittered, 0.0); + ev_timer_start (bk->event_loop, &bk->periodic_event); + } +} + +void +rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *bk) +{ + g_assert (bk != NULL); + + if (bk->sync > 0.0) { + rspamd_fuzzy_backend_periodic_sync (bk); + ev_timer_stop (bk->event_loop, &bk->periodic_event); + } + + bk->subr->close (bk, bk->subr_ud); + + g_free (bk); +} + +struct ev_loop* +rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend) +{ + return backend->event_loop; +} + +gdouble +rspamd_fuzzy_backend_get_expire (struct rspamd_fuzzy_backend *backend) +{ + return backend->expire; +} diff --git a/src/libserver/fuzzy_backend/fuzzy_backend.h b/src/libserver/fuzzy_backend/fuzzy_backend.h new file mode 100644 index 000000000..23b9b68ef --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend.h @@ -0,0 +1,131 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef SRC_LIBSERVER_FUZZY_BACKEND_H_ +#define SRC_LIBSERVER_FUZZY_BACKEND_H_ + +#include "config.h" +#include "contrib/libev/ev.h" +#include "fuzzy_wire.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct rspamd_fuzzy_backend; +struct rspamd_config; + +/* + * Callbacks for fuzzy methods + */ +typedef void (*rspamd_fuzzy_check_cb) (struct rspamd_fuzzy_reply *rep, void *ud); + +typedef void (*rspamd_fuzzy_update_cb) (gboolean success, + guint nadded, + guint ndeleted, + guint nextended, + guint nignored, + void *ud); + +typedef void (*rspamd_fuzzy_version_cb) (guint64 rev, void *ud); + +typedef void (*rspamd_fuzzy_count_cb) (guint64 count, void *ud); + +typedef gboolean (*rspamd_fuzzy_periodic_cb) (void *ud); + +/** + * Open fuzzy backend + * @param ev_base + * @param config + * @param err + * @return + */ +struct rspamd_fuzzy_backend *rspamd_fuzzy_backend_create (struct ev_loop *ev_base, + const ucl_object_t *config, + struct rspamd_config *cfg, + GError **err); + + +/** + * Check a specific hash in storage + * @param cmd + * @param cb + * @param ud + */ +void rspamd_fuzzy_backend_check (struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud); + +/** + * Process updates for a specific queue + * @param bk + * @param updates queue of struct fuzzy_peer_cmd + * @param src + */ +void rspamd_fuzzy_backend_process_updates (struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, rspamd_fuzzy_update_cb cb, + void *ud); + +/** + * Gets number of hashes from the backend + * @param bk + * @param cb + * @param ud + */ +void rspamd_fuzzy_backend_count (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud); + +/** + * Returns number of revision for a specific source + * @param bk + * @param src + * @param cb + * @param ud + */ +void rspamd_fuzzy_backend_version (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud); + +/** + * Returns unique id for backend + * @param backend + * @return + */ +const gchar *rspamd_fuzzy_backend_id (struct rspamd_fuzzy_backend *backend); + +/** + * Starts expire process for the backend + * @param backend + */ +void rspamd_fuzzy_backend_start_update (struct rspamd_fuzzy_backend *backend, + gdouble timeout, + rspamd_fuzzy_periodic_cb cb, + void *ud); + +struct ev_loop *rspamd_fuzzy_backend_event_base (struct rspamd_fuzzy_backend *backend); + +gdouble rspamd_fuzzy_backend_get_expire (struct rspamd_fuzzy_backend *backend); + +/** + * Closes backend + * @param backend + */ +void rspamd_fuzzy_backend_close (struct rspamd_fuzzy_backend *backend); + +#ifdef __cplusplus +} +#endif + +#endif /* SRC_LIBSERVER_FUZZY_BACKEND_H_ */ diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_redis.c b/src/libserver/fuzzy_backend/fuzzy_backend_redis.c new file mode 100644 index 000000000..7070773b3 --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend_redis.c @@ -0,0 +1,1564 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "config.h" +#include "ref.h" +#include "fuzzy_backend.h" +#include "fuzzy_backend_redis.h" +#include "redis_pool.h" +#include "cryptobox.h" +#include "str_util.h" +#include "upstream.h" +#include "contrib/hiredis/hiredis.h" +#include "contrib/hiredis/async.h" +#include "lua/lua_common.h" + +#define REDIS_DEFAULT_PORT 6379 +#define REDIS_DEFAULT_OBJECT "fuzzy" +#define REDIS_DEFAULT_TIMEOUT 2.0 + +#define msg_err_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ + "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ + "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_redis_session(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ + "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_redis_session(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_fuzzy_redis_log_id, "fuzzy_redis", session->backend->id, \ + G_STRFUNC, \ + __VA_ARGS__) + +INIT_LOG_MODULE(fuzzy_redis) + +struct rspamd_fuzzy_backend_redis { + lua_State *L; + const gchar *redis_object; + const gchar *password; + const gchar *dbname; + gchar *id; + struct rspamd_redis_pool *pool; + gdouble timeout; + gint conf_ref; + ref_entry_t ref; +}; + +enum rspamd_fuzzy_redis_command { + RSPAMD_FUZZY_REDIS_COMMAND_COUNT, + RSPAMD_FUZZY_REDIS_COMMAND_VERSION, + RSPAMD_FUZZY_REDIS_COMMAND_UPDATES, + RSPAMD_FUZZY_REDIS_COMMAND_CHECK +}; + +struct rspamd_fuzzy_redis_session { + struct rspamd_fuzzy_backend_redis *backend; + redisAsyncContext *ctx; + ev_timer timeout; + const struct rspamd_fuzzy_cmd *cmd; + struct ev_loop *event_loop; + float prob; + gboolean shingles_checked; + + enum rspamd_fuzzy_redis_command command; + guint nargs; + + guint nadded; + guint ndeleted; + guint nextended; + guint nignored; + + union { + rspamd_fuzzy_check_cb cb_check; + rspamd_fuzzy_update_cb cb_update; + rspamd_fuzzy_version_cb cb_version; + rspamd_fuzzy_count_cb cb_count; + } callback; + void *cbdata; + + gchar **argv; + gsize *argv_lens; + struct upstream *up; + guchar found_digest[rspamd_cryptobox_HASHBYTES]; +}; + +static inline struct upstream_list * +rspamd_redis_get_servers (struct rspamd_fuzzy_backend_redis *ctx, + const gchar *what) +{ + lua_State *L = ctx->L; + struct upstream_list *res; + + lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref); + lua_pushstring (L, what); + lua_gettable (L, -2); + res = *((struct upstream_list**)lua_touserdata (L, -1)); + lua_settop (L, 0); + + return res; +} + +static inline void +rspamd_fuzzy_redis_session_free_args (struct rspamd_fuzzy_redis_session *session) +{ + guint i; + + if (session->argv) { + for (i = 0; i < session->nargs; i ++) { + g_free (session->argv[i]); + } + + g_free (session->argv); + g_free (session->argv_lens); + } +} +static void +rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session, + gboolean is_fatal) +{ + redisAsyncContext *ac; + + + if (session->ctx) { + ac = session->ctx; + session->ctx = NULL; + rspamd_redis_pool_release_connection (session->backend->pool, + ac, + is_fatal ? RSPAMD_REDIS_RELEASE_FATAL : RSPAMD_REDIS_RELEASE_DEFAULT); + } + + ev_timer_stop (session->event_loop, &session->timeout); + rspamd_fuzzy_redis_session_free_args (session); + + REF_RELEASE (session->backend); + g_free (session); +} + +static void +rspamd_fuzzy_backend_redis_dtor (struct rspamd_fuzzy_backend_redis *backend) +{ + lua_State *L = backend->L; + + if (backend->conf_ref) { + luaL_unref (L, LUA_REGISTRYINDEX, backend->conf_ref); + } + + if (backend->id) { + g_free (backend->id); + } + + g_free (backend); +} + +void* +rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk, + const ucl_object_t *obj, struct rspamd_config *cfg, GError **err) +{ + struct rspamd_fuzzy_backend_redis *backend; + const ucl_object_t *elt; + gboolean ret = FALSE; + guchar id_hash[rspamd_cryptobox_HASHBYTES]; + rspamd_cryptobox_hash_state_t st; + lua_State *L = (lua_State *)cfg->lua_state; + gint conf_ref = -1; + + backend = g_malloc0 (sizeof (*backend)); + + backend->timeout = REDIS_DEFAULT_TIMEOUT; + backend->redis_object = REDIS_DEFAULT_OBJECT; + backend->L = L; + + ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref); + + /* Now try global redis settings */ + if (!ret) { + elt = ucl_object_lookup (cfg->rcl_obj, "redis"); + + if (elt) { + const ucl_object_t *specific_obj; + + specific_obj = ucl_object_lookup_any (elt, "fuzzy", "fuzzy_storage", + NULL); + + if (specific_obj) { + ret = rspamd_lua_try_load_redis (L, specific_obj, cfg, &conf_ref); + } + else { + ret = rspamd_lua_try_load_redis (L, elt, cfg, &conf_ref); + } + } + } + + if (!ret) { + msg_err_config ("cannot init redis backend for fuzzy storage"); + g_free (backend); + + return NULL; + } + + elt = ucl_object_lookup (obj, "prefix"); + if (elt == NULL || ucl_object_type (elt) != UCL_STRING) { + backend->redis_object = REDIS_DEFAULT_OBJECT; + } + else { + backend->redis_object = ucl_object_tostring (elt); + } + + backend->conf_ref = conf_ref; + + /* Check some common table values */ + lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref); + + lua_pushstring (L, "timeout"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TNUMBER) { + backend->timeout = lua_tonumber (L, -1); + } + lua_pop (L, 1); + + lua_pushstring (L, "db"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TSTRING) { + backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool, + lua_tostring (L, -1)); + } + lua_pop (L, 1); + + lua_pushstring (L, "password"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TSTRING) { + backend->password = rspamd_mempool_strdup (cfg->cfg_pool, + lua_tostring (L, -1)); + } + lua_pop (L, 1); + + lua_settop (L, 0); + + REF_INIT_RETAIN (backend, rspamd_fuzzy_backend_redis_dtor); + backend->pool = cfg->redis_pool; + rspamd_cryptobox_hash_init (&st, NULL, 0); + rspamd_cryptobox_hash_update (&st, backend->redis_object, + strlen (backend->redis_object)); + + if (backend->dbname) { + rspamd_cryptobox_hash_update (&st, backend->dbname, + strlen (backend->dbname)); + } + + if (backend->password) { + rspamd_cryptobox_hash_update (&st, backend->password, + strlen (backend->password)); + } + + rspamd_cryptobox_hash_final (&st, id_hash); + backend->id = rspamd_encode_base32 (id_hash, sizeof (id_hash)); + + return backend; +} + +static void +rspamd_fuzzy_redis_timeout (EV_P_ ev_timer *w, int revents) +{ + struct rspamd_fuzzy_redis_session *session = + (struct rspamd_fuzzy_redis_session *)w->data; + redisAsyncContext *ac; + static char errstr[128]; + + if (session->ctx) { + ac = session->ctx; + session->ctx = NULL; + ac->err = REDIS_ERR_IO; + /* Should be safe as in hiredis it is char[128] */ + rspamd_snprintf (errstr, sizeof (errstr), "%s", strerror (ETIMEDOUT)); + ac->errstr = errstr; + + /* This will cause session closing */ + rspamd_redis_pool_release_connection (session->backend->pool, + ac, RSPAMD_REDIS_RELEASE_FATAL); + } +} + +static void rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r, + gpointer priv); + +struct _rspamd_fuzzy_shingles_helper { + guchar digest[64]; + guint found; +}; + +static gint +rspamd_fuzzy_backend_redis_shingles_cmp (const void *a, const void *b) +{ + const struct _rspamd_fuzzy_shingles_helper *sha = a, + *shb = b; + + return memcmp (sha->digest, shb->digest, sizeof (sha->digest)); +} + +static void +rspamd_fuzzy_redis_shingles_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r, *cur; + struct rspamd_fuzzy_reply rep; + GString *key; + struct _rspamd_fuzzy_shingles_helper *shingles, *prev = NULL, *sel = NULL; + guint i, found = 0, max_found = 0, cur_found = 0; + + ev_timer_stop (session->event_loop, &session->timeout); + memset (&rep, 0, sizeof (rep)); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_ARRAY && + reply->elements == RSPAMD_SHINGLE_SIZE) { + shingles = g_alloca (sizeof (struct _rspamd_fuzzy_shingles_helper) * + RSPAMD_SHINGLE_SIZE); + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + cur = reply->element[i]; + + if (cur->type == REDIS_REPLY_STRING) { + shingles[i].found = 1; + memcpy (shingles[i].digest, cur->str, MIN (64, cur->len)); + found ++; + } + else { + memset (shingles[i].digest, 0, sizeof (shingles[i].digest)); + shingles[i].found = 0; + } + } + + if (found > RSPAMD_SHINGLE_SIZE / 2) { + /* Now sort to find the most frequent element */ + qsort (shingles, RSPAMD_SHINGLE_SIZE, + sizeof (struct _rspamd_fuzzy_shingles_helper), + rspamd_fuzzy_backend_redis_shingles_cmp); + + prev = &shingles[0]; + + for (i = 1; i < RSPAMD_SHINGLE_SIZE; i ++) { + if (!shingles[i].found) { + continue; + } + + if (memcmp (shingles[i].digest, prev->digest, 64) == 0) { + cur_found ++; + + if (cur_found > max_found) { + max_found = cur_found; + sel = &shingles[i]; + } + } + else { + cur_found = 1; + prev = &shingles[i]; + } + } + + if (max_found > RSPAMD_SHINGLE_SIZE / 2) { + session->prob = ((float)max_found) / RSPAMD_SHINGLE_SIZE; + rep.v1.prob = session->prob; + + g_assert (sel != NULL); + + /* Prepare new check command */ + rspamd_fuzzy_redis_session_free_args (session); + session->nargs = 5; + session->argv = g_malloc (sizeof (gchar *) * session->nargs); + session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); + + key = g_string_new (session->backend->redis_object); + g_string_append_len (key, sel->digest, sizeof (sel->digest)); + session->argv[0] = g_strdup ("HMGET"); + session->argv_lens[0] = 5; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + session->argv[2] = g_strdup ("V"); + session->argv_lens[2] = 1; + session->argv[3] = g_strdup ("F"); + session->argv_lens[3] = 1; + session->argv[4] = g_strdup ("C"); + session->argv_lens[4] = 1; + g_string_free (key, FALSE); /* Do not free underlying array */ + memcpy (session->found_digest, sel->digest, + sizeof (session->cmd->digest)); + + g_assert (session->ctx != NULL); + if (redisAsyncCommandArgv (session->ctx, + rspamd_fuzzy_redis_check_callback, + session, session->nargs, + (const gchar **)session->argv, + session->argv_lens) != REDIS_OK) { + + if (session->callback.cb_check) { + memset (&rep, 0, sizeof (rep)); + session->callback.cb_check (&rep, session->cbdata); + } + + rspamd_fuzzy_redis_session_dtor (session, TRUE); + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_timer_init (&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start (session->event_loop, &session->timeout); + } + + return; + } + } + } + + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + } + else { + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session ("error getting shingles: %s", c->errstr); + } + + rspamd_upstream_fail (session->up, FALSE, strerror (errno)); + } + + rspamd_fuzzy_redis_session_dtor (session, FALSE); +} + +static void +rspamd_fuzzy_backend_check_shingles (struct rspamd_fuzzy_redis_session *session) +{ + struct rspamd_fuzzy_reply rep; + const struct rspamd_fuzzy_shingle_cmd *shcmd; + GString *key; + guint i, init_len; + + rspamd_fuzzy_redis_session_free_args (session); + /* First of all check digest */ + session->nargs = RSPAMD_SHINGLE_SIZE + 1; + session->argv = g_malloc (sizeof (gchar *) * session->nargs); + session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); + shcmd = (const struct rspamd_fuzzy_shingle_cmd *)session->cmd; + + session->argv[0] = g_strdup ("MGET"); + session->argv_lens[0] = 4; + init_len = strlen (session->backend->redis_object); + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + + key = g_string_sized_new (init_len + 2 + 2 + sizeof ("18446744073709551616")); + rspamd_printf_gstring (key, "%s_%d_%uL", session->backend->redis_object, + i, shcmd->sgl.hashes[i]); + session->argv[i + 1] = key->str; + session->argv_lens[i + 1] = key->len; + g_string_free (key, FALSE); /* Do not free underlying array */ + } + + session->shingles_checked = TRUE; + + g_assert (session->ctx != NULL); + + if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_shingles_callback, + session, session->nargs, + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { + msg_err ("cannot execute redis command: %s", session->ctx->errstr); + + if (session->callback.cb_check) { + memset (&rep, 0, sizeof (rep)); + session->callback.cb_check (&rep, session->cbdata); + } + + rspamd_fuzzy_redis_session_dtor (session, TRUE); + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_timer_init (&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start (session->event_loop, &session->timeout); + } +} + +static void +rspamd_fuzzy_redis_check_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r, *cur; + struct rspamd_fuzzy_reply rep; + gulong value; + guint found_elts = 0; + + ev_timer_stop (session->event_loop, &session->timeout); + memset (&rep, 0, sizeof (rep)); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_ARRAY && reply->elements >= 2) { + cur = reply->element[0]; + + if (cur->type == REDIS_REPLY_STRING) { + value = strtoul (cur->str, NULL, 10); + rep.v1.value = value; + found_elts ++; + } + + cur = reply->element[1]; + + if (cur->type == REDIS_REPLY_STRING) { + value = strtoul (cur->str, NULL, 10); + rep.v1.flag = value; + found_elts ++; + } + + if (found_elts >= 2) { + rep.v1.prob = session->prob; + memcpy (rep.digest, session->found_digest, sizeof (rep.digest)); + } + + rep.ts = 0; + + if (reply->elements > 2) { + cur = reply->element[2]; + + if (cur->type == REDIS_REPLY_STRING) { + rep.ts = strtoul (cur->str, NULL, 10); + } + } + } + + if (found_elts != 2) { + if (session->cmd->shingles_count > 0 && !session->shingles_checked) { + /* We also need to check all shingles here */ + rspamd_fuzzy_backend_check_shingles (session); + /* Do not free session */ + return; + } + else { + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + } + } + else { + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + } + } + else { + if (session->callback.cb_check) { + session->callback.cb_check (&rep, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session ("error getting hashes: %s", c->errstr); + } + + rspamd_upstream_fail (session->up, FALSE, strerror (errno)); + } + + rspamd_fuzzy_redis_session_dtor (session, FALSE); +} + +void +rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct upstream_list *ups; + rspamd_inet_addr_t *addr; + struct rspamd_fuzzy_reply rep; + GString *key; + + g_assert (backend != NULL); + + session = g_malloc0 (sizeof (*session)); + session->backend = backend; + REF_RETAIN (session->backend); + + session->callback.cb_check = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_CHECK; + session->cmd = cmd; + session->prob = 1.0; + memcpy (rep.digest, session->cmd->digest, sizeof (rep.digest)); + memcpy (session->found_digest, session->cmd->digest, sizeof (rep.digest)); + session->event_loop = rspamd_fuzzy_backend_event_base (bk); + + /* First of all check digest */ + session->nargs = 5; + session->argv = g_malloc (sizeof (gchar *) * session->nargs); + session->argv_lens = g_malloc (sizeof (gsize) * session->nargs); + + key = g_string_new (backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + session->argv[0] = g_strdup ("HMGET"); + session->argv_lens[0] = 5; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + session->argv[2] = g_strdup ("V"); + session->argv_lens[2] = 1; + session->argv[3] = g_strdup ("F"); + session->argv_lens[3] = 1; + session->argv[4] = g_strdup ("C"); + session->argv_lens[4] = 1; + g_string_free (key, FALSE); /* Do not free underlying array */ + + ups = rspamd_redis_get_servers (backend, "read_servers"); + up = rspamd_upstream_get (ups, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + + session->up = up; + addr = rspamd_upstream_addr_next (up); + g_assert (addr != NULL); + session->ctx = rspamd_redis_pool_connect (backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + + if (session->ctx == NULL) { + rspamd_upstream_fail (up, TRUE, strerror (errno)); + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + memset (&rep, 0, sizeof (rep)); + cb (&rep, ud); + } + } + else { + if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_check_callback, + session, session->nargs, + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + memset (&rep, 0, sizeof (rep)); + cb (&rep, ud); + } + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_timer_init (&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start (session->event_loop, &session->timeout); + } + } +} + +static void +rspamd_fuzzy_redis_count_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r; + gulong nelts; + + ev_timer_stop (session->event_loop, &session->timeout); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_INTEGER) { + if (session->callback.cb_count) { + session->callback.cb_count (reply->integer, session->cbdata); + } + } + else if (reply->type == REDIS_REPLY_STRING) { + nelts = strtoul (reply->str, NULL, 10); + + if (session->callback.cb_count) { + session->callback.cb_count (nelts, session->cbdata); + } + } + else { + if (session->callback.cb_count) { + session->callback.cb_count (0, session->cbdata); + } + } + } + else { + if (session->callback.cb_count) { + session->callback.cb_count (0, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session ("error getting count: %s", c->errstr); + } + + rspamd_upstream_fail (session->up, FALSE, strerror (errno)); + } + + rspamd_fuzzy_redis_session_dtor (session, FALSE); +} + +void +rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct upstream_list *ups; + rspamd_inet_addr_t *addr; + GString *key; + + g_assert (backend != NULL); + + session = g_malloc0 (sizeof (*session)); + session->backend = backend; + REF_RETAIN (session->backend); + + session->callback.cb_count = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_COUNT; + session->event_loop = rspamd_fuzzy_backend_event_base (bk); + + session->nargs = 2; + session->argv = g_malloc (sizeof (gchar *) * 2); + session->argv_lens = g_malloc (sizeof (gsize) * 2); + key = g_string_new (backend->redis_object); + g_string_append (key, "_count"); + session->argv[0] = g_strdup ("GET"); + session->argv_lens[0] = 3; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + g_string_free (key, FALSE); /* Do not free underlying array */ + + ups = rspamd_redis_get_servers (backend, "read_servers"); + up = rspamd_upstream_get (ups, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + + session->up = up; + addr = rspamd_upstream_addr_next (up); + g_assert (addr != NULL); + session->ctx = rspamd_redis_pool_connect (backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + + if (session->ctx == NULL) { + rspamd_upstream_fail (up, TRUE, strerror (errno)); + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + cb (0, ud); + } + } + else { + if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_count_callback, + session, session->nargs, + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + cb (0, ud); + } + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_timer_init (&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start (session->event_loop, &session->timeout); + } + } +} + +static void +rspamd_fuzzy_redis_version_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r; + gulong nelts; + + ev_timer_stop (session->event_loop, &session->timeout); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_INTEGER) { + if (session->callback.cb_version) { + session->callback.cb_version (reply->integer, session->cbdata); + } + } + else if (reply->type == REDIS_REPLY_STRING) { + nelts = strtoul (reply->str, NULL, 10); + + if (session->callback.cb_version) { + session->callback.cb_version (nelts, session->cbdata); + } + } + else { + if (session->callback.cb_version) { + session->callback.cb_version (0, session->cbdata); + } + } + } + else { + if (session->callback.cb_version) { + session->callback.cb_version (0, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session ("error getting version: %s", c->errstr); + } + + rspamd_upstream_fail (session->up, FALSE, strerror (errno)); + } + + rspamd_fuzzy_redis_session_dtor (session, FALSE); +} + +void +rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct upstream_list *ups; + rspamd_inet_addr_t *addr; + GString *key; + + g_assert (backend != NULL); + + session = g_malloc0 (sizeof (*session)); + session->backend = backend; + REF_RETAIN (session->backend); + + session->callback.cb_version = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_VERSION; + session->event_loop = rspamd_fuzzy_backend_event_base (bk); + + session->nargs = 2; + session->argv = g_malloc (sizeof (gchar *) * 2); + session->argv_lens = g_malloc (sizeof (gsize) * 2); + key = g_string_new (backend->redis_object); + g_string_append (key, src); + session->argv[0] = g_strdup ("GET"); + session->argv_lens[0] = 3; + session->argv[1] = key->str; + session->argv_lens[1] = key->len; + g_string_free (key, FALSE); /* Do not free underlying array */ + + ups = rspamd_redis_get_servers (backend, "read_servers"); + up = rspamd_upstream_get (ups, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + + session->up = up; + addr = rspamd_upstream_addr_next (up); + g_assert (addr != NULL); + session->ctx = rspamd_redis_pool_connect (backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + + if (session->ctx == NULL) { + rspamd_upstream_fail (up, FALSE, strerror (errno)); + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + cb (0, ud); + } + } + else { + if (redisAsyncCommandArgv (session->ctx, rspamd_fuzzy_redis_version_callback, + session, session->nargs, + (const gchar **)session->argv, session->argv_lens) != REDIS_OK) { + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + cb (0, ud); + } + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_timer_init (&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start (session->event_loop, &session->timeout); + } + } +} + +const gchar* +rspamd_fuzzy_backend_id_redis (struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + g_assert (backend != NULL); + + return backend->id; +} + +void +rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + + g_assert (backend != NULL); +} + +static gboolean +rspamd_fuzzy_update_append_command (struct rspamd_fuzzy_backend *bk, + struct rspamd_fuzzy_redis_session *session, + struct fuzzy_peer_cmd *io_cmd, guint *shift) +{ + GString *key, *value; + guint cur_shift = *shift; + guint i, klen; + struct rspamd_fuzzy_cmd *cmd; + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + } + else { + cmd = &io_cmd->cmd.normal; + + } + + if (cmd->cmd == FUZZY_WRITE) { + /* + * For each normal hash addition we do 5 redis commands: + * HSET <key> F <flag> + * HSETNX <key> C <time> + * HINCRBY <key> V <weight> + * EXPIRE <key> <expire> + * Where <key> is <prefix> || <digest> + */ + + /* HSET */ + klen = strlen (session->backend->redis_object) + + sizeof (cmd->digest) + 1; + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (sizeof ("4294967296")); + rspamd_printf_gstring (value, "%d", cmd->flag); + session->argv[cur_shift] = g_strdup ("HSET"); + session->argv_lens[cur_shift++] = sizeof ("HSET") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = g_strdup ("F"); + session->argv_lens[cur_shift++] = sizeof ("F") - 1; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 4, + (const gchar **)&session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + + /* HSETNX */ + klen = strlen (session->backend->redis_object) + + sizeof (cmd->digest) + 1; + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (sizeof ("18446744073709551616")); + rspamd_printf_gstring (value, "%L", (gint64)rspamd_get_calendar_ticks ()); + session->argv[cur_shift] = g_strdup ("HSETNX"); + session->argv_lens[cur_shift++] = sizeof ("HSETNX") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = g_strdup ("C"); + session->argv_lens[cur_shift++] = sizeof ("C") - 1; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 4, + (const gchar **)&session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + + /* HINCRBY */ + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (sizeof ("4294967296")); + rspamd_printf_gstring (value, "%d", cmd->value); + session->argv[cur_shift] = g_strdup ("HINCRBY"); + session->argv_lens[cur_shift++] = sizeof ("HINCRBY") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = g_strdup ("V"); + session->argv_lens[cur_shift++] = sizeof ("V") - 1; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 4, + (const gchar **)&session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + + /* EXPIRE */ + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (sizeof ("4294967296")); + rspamd_printf_gstring (value, "%d", + (gint)rspamd_fuzzy_backend_get_expire (bk)); + session->argv[cur_shift] = g_strdup ("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 3, + (const gchar **)&session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + + return FALSE; + } + + /* INCR */ + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append (key, "_count"); + session->argv[cur_shift] = g_strdup ("INCR"); + session->argv_lens[cur_shift++] = sizeof ("INCR") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free (key, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 2, + (const gchar **)&session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + return FALSE; + } + } + else if (cmd->cmd == FUZZY_DEL) { + /* DEL */ + klen = strlen (session->backend->redis_object) + + sizeof (cmd->digest) + 1; + + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + session->argv[cur_shift] = g_strdup ("DEL"); + session->argv_lens[cur_shift++] = sizeof ("DEL") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free (key, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 2, + (const gchar **)&session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + return FALSE; + } + + /* DECR */ + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append (key, "_count"); + session->argv[cur_shift] = g_strdup ("DECR"); + session->argv_lens[cur_shift++] = sizeof ("DECR") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free (key, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 2, + (const gchar **)&session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + return FALSE; + } + } + else if (cmd->cmd == FUZZY_REFRESH) { + /* + * Issue refresh command by just EXPIRE command + * EXPIRE <key> <expire> + * Where <key> is <prefix> || <digest> + */ + + klen = strlen (session->backend->redis_object) + + sizeof (cmd->digest) + 1; + + /* EXPIRE */ + key = g_string_sized_new (klen); + g_string_append (key, session->backend->redis_object); + g_string_append_len (key, cmd->digest, sizeof (cmd->digest)); + value = g_string_sized_new (sizeof ("4294967296")); + rspamd_printf_gstring (value, "%d", + (gint)rspamd_fuzzy_backend_get_expire (bk)); + session->argv[cur_shift] = g_strdup ("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 3, + (const gchar **)&session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + + return FALSE; + } + } + else if (cmd->cmd == FUZZY_DUP) { + /* Ignore */ + } + else { + g_assert_not_reached (); + } + + if (io_cmd->is_shingle) { + if (cmd->cmd == FUZZY_WRITE) { + klen = strlen (session->backend->redis_object) + + 64 + 1; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + guchar *hval; + /* + * For each command with shingles we additionally emit 32 commands: + * SETEX <prefix>_<number>_<value> <expire> <digest> + */ + + /* SETEX */ + key = g_string_sized_new (klen); + rspamd_printf_gstring (key, "%s_%d_%uL", + session->backend->redis_object, + i, + io_cmd->cmd.shingle.sgl.hashes[i]); + value = g_string_sized_new (sizeof ("4294967296")); + rspamd_printf_gstring (value, "%d", + (gint)rspamd_fuzzy_backend_get_expire (bk)); + hval = g_malloc (sizeof (io_cmd->cmd.shingle.basic.digest)); + memcpy (hval, io_cmd->cmd.shingle.basic.digest, + sizeof (io_cmd->cmd.shingle.basic.digest)); + session->argv[cur_shift] = g_strdup ("SETEX"); + session->argv_lens[cur_shift++] = sizeof ("SETEX") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + session->argv[cur_shift] = hval; + session->argv_lens[cur_shift++] = sizeof (io_cmd->cmd.shingle.basic.digest); + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 4, + (const gchar **)&session->argv[cur_shift - 4], + &session->argv_lens[cur_shift - 4]) != REDIS_OK) { + + return FALSE; + } + } + } + else if (cmd->cmd == FUZZY_DEL) { + klen = strlen (session->backend->redis_object) + + 64 + 1; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + key = g_string_sized_new (klen); + rspamd_printf_gstring (key, "%s_%d_%uL", + session->backend->redis_object, + i, + io_cmd->cmd.shingle.sgl.hashes[i]); + session->argv[cur_shift] = g_strdup ("DEL"); + session->argv_lens[cur_shift++] = sizeof ("DEL") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + g_string_free (key, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 2, + (const gchar **)&session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + return FALSE; + } + } + } + else if (cmd->cmd == FUZZY_REFRESH) { + klen = strlen (session->backend->redis_object) + + 64 + 1; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + /* + * For each command with shingles we additionally emit 32 commands: + * EXPIRE <prefix>_<number>_<value> <expire> + */ + + /* Expire */ + key = g_string_sized_new (klen); + rspamd_printf_gstring (key, "%s_%d_%uL", + session->backend->redis_object, + i, + io_cmd->cmd.shingle.sgl.hashes[i]); + value = g_string_sized_new (sizeof ("18446744073709551616")); + rspamd_printf_gstring (value, "%d", + (gint)rspamd_fuzzy_backend_get_expire (bk)); + session->argv[cur_shift] = g_strdup ("EXPIRE"); + session->argv_lens[cur_shift++] = sizeof ("EXPIRE") - 1; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift++] = key->len; + session->argv[cur_shift] = value->str; + session->argv_lens[cur_shift++] = value->len; + g_string_free (key, FALSE); + g_string_free (value, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 3, + (const gchar **)&session->argv[cur_shift - 3], + &session->argv_lens[cur_shift - 3]) != REDIS_OK) { + + return FALSE; + } + } + } + else if (cmd->cmd == FUZZY_DUP) { + /* Ignore */ + } + else { + g_assert_not_reached (); + } + } + + *shift = cur_shift; + + return TRUE; +} + +static void +rspamd_fuzzy_redis_update_callback (redisAsyncContext *c, gpointer r, + gpointer priv) +{ + struct rspamd_fuzzy_redis_session *session = priv; + redisReply *reply = r; + + ev_timer_stop (session->event_loop, &session->timeout); + + if (c->err == 0) { + rspamd_upstream_ok (session->up); + + if (reply->type == REDIS_REPLY_ARRAY) { + /* TODO: check all replies somehow */ + if (session->callback.cb_update) { + session->callback.cb_update (TRUE, + session->nadded, + session->ndeleted, + session->nextended, + session->nignored, + session->cbdata); + } + } + else { + if (session->callback.cb_update) { + session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata); + } + } + } + else { + if (session->callback.cb_update) { + session->callback.cb_update (FALSE, 0, 0, 0, 0, session->cbdata); + } + + if (c->errstr) { + msg_err_redis_session ("error sending update to redis: %s", c->errstr); + } + + rspamd_upstream_fail (session->up, FALSE, strerror (errno)); + } + + rspamd_fuzzy_redis_session_dtor (session, FALSE); +} + +void +rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + struct rspamd_fuzzy_redis_session *session; + struct upstream *up; + struct upstream_list *ups; + rspamd_inet_addr_t *addr; + guint i; + GString *key; + struct fuzzy_peer_cmd *io_cmd; + struct rspamd_fuzzy_cmd *cmd = NULL; + guint nargs, ncommands, cur_shift; + + g_assert (backend != NULL); + + session = g_malloc0 (sizeof (*session)); + session->backend = backend; + REF_RETAIN (session->backend); + + /* + * For each normal hash addition we do 3 redis commands: + * HSET <key> F <flag> + * HINCRBY <key> V <weight> + * EXPIRE <key> <expire> + * INCR <prefix||fuzzy_count> + * + * Where <key> is <prefix> || <digest> + * + * For each command with shingles we additionally emit 32 commands: + * SETEX <prefix>_<number>_<value> <expire> <digest> + * + * For each delete command we emit: + * DEL <key> + * + * For each delete command with shingles we emit also 32 commands: + * DEL <prefix>_<number>_<value> + * DECR <prefix||fuzzy_count> + */ + + ncommands = 3; /* For MULTI + EXEC + INCR <src> */ + nargs = 4; + + for (i = 0; i < updates->len; i ++) { + io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i); + + if (io_cmd->is_shingle) { + cmd = &io_cmd->cmd.shingle.basic; + } + else { + cmd = &io_cmd->cmd.normal; + } + + if (cmd->cmd == FUZZY_WRITE) { + ncommands += 5; + nargs += 17; + session->nadded ++; + + if (io_cmd->is_shingle) { + ncommands += RSPAMD_SHINGLE_SIZE; + nargs += RSPAMD_SHINGLE_SIZE * 4; + } + + } + else if (cmd->cmd == FUZZY_DEL) { + ncommands += 2; + nargs += 4; + session->ndeleted ++; + + if (io_cmd->is_shingle) { + ncommands += RSPAMD_SHINGLE_SIZE; + nargs += RSPAMD_SHINGLE_SIZE * 2; + } + } + else if (cmd->cmd == FUZZY_REFRESH) { + ncommands += 1; + nargs += 3; + session->nextended ++; + + if (io_cmd->is_shingle) { + ncommands += RSPAMD_SHINGLE_SIZE; + nargs += RSPAMD_SHINGLE_SIZE * 3; + } + } + else { + session->nignored ++; + } + } + + /* Now we need to create a new request */ + session->callback.cb_update = cb; + session->cbdata = ud; + session->command = RSPAMD_FUZZY_REDIS_COMMAND_UPDATES; + session->cmd = cmd; + session->prob = 1.0; + session->event_loop = rspamd_fuzzy_backend_event_base (bk); + + /* First of all check digest */ + session->nargs = nargs; + session->argv = g_malloc0 (sizeof (gchar *) * session->nargs); + session->argv_lens = g_malloc0 (sizeof (gsize) * session->nargs); + + ups = rspamd_redis_get_servers (backend, "write_servers"); + up = rspamd_upstream_get (ups, + RSPAMD_UPSTREAM_MASTER_SLAVE, + NULL, + 0); + + session->up = up; + addr = rspamd_upstream_addr_next (up); + g_assert (addr != NULL); + session->ctx = rspamd_redis_pool_connect (backend->pool, + backend->dbname, backend->password, + rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + + if (session->ctx == NULL) { + rspamd_upstream_fail (up, TRUE, strerror (errno)); + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + if (cb) { + cb (FALSE, 0, 0, 0, 0, ud); + } + } + else { + /* Start with MULTI command */ + session->argv[0] = g_strdup ("MULTI"); + session->argv_lens[0] = 5; + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 1, + (const gchar **)session->argv, + session->argv_lens) != REDIS_OK) { + + if (cb) { + cb (FALSE, 0, 0, 0, 0, ud); + } + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + return; + } + + /* Now split the rest of commands in packs and emit them command by command */ + cur_shift = 1; + + for (i = 0; i < updates->len; i ++) { + io_cmd = &g_array_index (updates, struct fuzzy_peer_cmd, i); + + if (!rspamd_fuzzy_update_append_command (bk, session, io_cmd, + &cur_shift)) { + if (cb) { + cb (FALSE, 0, 0, 0, 0, ud); + } + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + return; + } + } + + /* Now INCR command for the source */ + key = g_string_new (backend->redis_object); + g_string_append (key, src); + session->argv[cur_shift] = g_strdup ("INCR"); + session->argv_lens[cur_shift ++] = 4; + session->argv[cur_shift] = key->str; + session->argv_lens[cur_shift ++] = key->len; + g_string_free (key, FALSE); + + if (redisAsyncCommandArgv (session->ctx, NULL, NULL, + 2, + (const gchar **)&session->argv[cur_shift - 2], + &session->argv_lens[cur_shift - 2]) != REDIS_OK) { + + if (cb) { + cb (FALSE, 0, 0, 0, 0, ud); + } + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + return; + } + + /* Finally we call EXEC with a specific callback */ + session->argv[cur_shift] = g_strdup ("EXEC"); + session->argv_lens[cur_shift] = 4; + + if (redisAsyncCommandArgv (session->ctx, + rspamd_fuzzy_redis_update_callback, session, + 1, + (const gchar **)&session->argv[cur_shift], + &session->argv_lens[cur_shift]) != REDIS_OK) { + + if (cb) { + cb (FALSE, 0, 0, 0, 0, ud); + } + rspamd_fuzzy_redis_session_dtor (session, TRUE); + + return; + } + else { + /* Add timeout */ + session->timeout.data = session; + ev_timer_init (&session->timeout, + rspamd_fuzzy_redis_timeout, + session->backend->timeout, 0.0); + ev_timer_start (session->event_loop, &session->timeout); + } + } +} + +void +rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk, + void *subr_ud) +{ + struct rspamd_fuzzy_backend_redis *backend = subr_ud; + + g_assert (backend != NULL); + + REF_RELEASE (backend); +} diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_redis.h b/src/libserver/fuzzy_backend/fuzzy_backend_redis.h new file mode 100644 index 000000000..544b20f60 --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend_redis.h @@ -0,0 +1,67 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef SRC_LIBSERVER_FUZZY_BACKEND_REDIS_H_ +#define SRC_LIBSERVER_FUZZY_BACKEND_REDIS_H_ + +#include "config.h" +#include "fuzzy_backend.h" + + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * Subroutines for fuzzy_backend + */ +void *rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk, + const ucl_object_t *obj, + struct rspamd_config *cfg, + GError **err); + +void rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk, + const struct rspamd_fuzzy_cmd *cmd, + rspamd_fuzzy_check_cb cb, void *ud, + void *subr_ud); + +void rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, + GArray *updates, const gchar *src, + rspamd_fuzzy_update_cb cb, void *ud, + void *subr_ud); + +void rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk, + rspamd_fuzzy_count_cb cb, void *ud, + void *subr_ud); + +void rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk, + const gchar *src, + rspamd_fuzzy_version_cb cb, void *ud, + void *subr_ud); + +const gchar *rspamd_fuzzy_backend_id_redis (struct rspamd_fuzzy_backend *bk, + void *subr_ud); + +void rspamd_fuzzy_backend_expire_redis (struct rspamd_fuzzy_backend *bk, + void *subr_ud); + +void rspamd_fuzzy_backend_close_redis (struct rspamd_fuzzy_backend *bk, + void *subr_ud); + +#ifdef __cplusplus +} +#endif + +#endif /* SRC_LIBSERVER_FUZZY_BACKEND_REDIS_H_ */ diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c new file mode 100644 index 000000000..0f9b3c1ee --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.c @@ -0,0 +1,1065 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "config.h" +#include "rspamd.h" +#include "fuzzy_backend.h" +#include "fuzzy_backend_sqlite.h" +#include "unix-std.h" + +#include <sqlite3.h> +#include "libutil/sqlite_utils.h" + +struct rspamd_fuzzy_backend_sqlite { + sqlite3 *db; + char *path; + gchar id[MEMPOOL_UID_LEN]; + gsize count; + gsize expired; + rspamd_mempool_t *pool; +}; + +static const gdouble sql_sleep_time = 0.1; +static const guint max_retries = 10; + +#define msg_err_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \ + backend->pool->tag.tagname, backend->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_warn_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \ + backend->pool->tag.tagname, backend->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_info_fuzzy_backend(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \ + backend->pool->tag.tagname, backend->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) +#define msg_debug_fuzzy_backend(...) rspamd_conditional_debug_fast (NULL, NULL, \ + rspamd_fuzzy_sqlite_log_id, backend->pool->tag.tagname, backend->pool->tag.uid, \ + G_STRFUNC, \ + __VA_ARGS__) + +INIT_LOG_MODULE(fuzzy_sqlite) + +static const char *create_tables_sql = + "BEGIN;" + "CREATE TABLE IF NOT EXISTS digests(" + " id INTEGER PRIMARY KEY," + " flag INTEGER NOT NULL," + " digest TEXT NOT NULL," + " value INTEGER," + " time INTEGER);" + "CREATE TABLE IF NOT EXISTS shingles(" + " value INTEGER NOT NULL," + " number INTEGER NOT NULL," + " digest_id INTEGER REFERENCES digests(id) ON DELETE CASCADE " + " ON UPDATE CASCADE);" + "CREATE TABLE IF NOT EXISTS sources(" + " name TEXT UNIQUE," + " version INTEGER," + " last INTEGER);" + "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);" + "CREATE INDEX IF NOT EXISTS t ON digests(time);" + "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);" + "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);" + "COMMIT;"; +#if 0 +static const char *create_index_sql = + "BEGIN;" + "CREATE UNIQUE INDEX IF NOT EXISTS d ON digests(digest);" + "CREATE INDEX IF NOT EXISTS t ON digests(time);" + "CREATE INDEX IF NOT EXISTS dgst_id ON shingles(digest_id);" + "CREATE UNIQUE INDEX IF NOT EXISTS s ON shingles(value, number);" + "COMMIT;"; +#endif +enum rspamd_fuzzy_statement_idx { + RSPAMD_FUZZY_BACKEND_TRANSACTION_START = 0, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK, + RSPAMD_FUZZY_BACKEND_INSERT, + RSPAMD_FUZZY_BACKEND_UPDATE, + RSPAMD_FUZZY_BACKEND_UPDATE_FLAG, + RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE, + RSPAMD_FUZZY_BACKEND_CHECK, + RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE, + RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, + RSPAMD_FUZZY_BACKEND_DELETE, + RSPAMD_FUZZY_BACKEND_COUNT, + RSPAMD_FUZZY_BACKEND_EXPIRE, + RSPAMD_FUZZY_BACKEND_VACUUM, + RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, + RSPAMD_FUZZY_BACKEND_ADD_SOURCE, + RSPAMD_FUZZY_BACKEND_VERSION, + RSPAMD_FUZZY_BACKEND_SET_VERSION, + RSPAMD_FUZZY_BACKEND_MAX +}; +static struct rspamd_fuzzy_stmts { + enum rspamd_fuzzy_statement_idx idx; + const gchar *sql; + const gchar *args; + sqlite3_stmt *stmt; + gint result; +} prepared_stmts[RSPAMD_FUZZY_BACKEND_MAX] = +{ + { + .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_START, + .sql = "BEGIN TRANSACTION;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT, + .sql = "COMMIT;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK, + .sql = "ROLLBACK;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_INSERT, + .sql = "INSERT INTO digests(flag, digest, value, time) VALUES" + "(?1, ?2, ?3, strftime('%s','now'));", + .args = "SDI", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_UPDATE, + .sql = "UPDATE digests SET value = value + ?1, time = strftime('%s','now') WHERE " + "digest==?2;", + .args = "ID", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_UPDATE_FLAG, + .sql = "UPDATE digests SET value = ?1, flag = ?2, time = strftime('%s','now') WHERE " + "digest==?3;", + .args = "IID", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE, + .sql = "INSERT OR REPLACE INTO shingles(value, number, digest_id) " + "VALUES (?1, ?2, ?3);", + .args = "III", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_CHECK, + .sql = "SELECT value, time, flag FROM digests WHERE digest==?1;", + .args = "D", + .stmt = NULL, + .result = SQLITE_ROW + }, + { + .idx = RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE, + .sql = "SELECT digest_id FROM shingles WHERE value=?1 AND number=?2", + .args = "IS", + .stmt = NULL, + .result = SQLITE_ROW + }, + { + .idx = RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, + .sql = "SELECT digest, value, time, flag FROM digests WHERE id=?1", + .args = "I", + .stmt = NULL, + .result = SQLITE_ROW + }, + { + .idx = RSPAMD_FUZZY_BACKEND_DELETE, + .sql = "DELETE FROM digests WHERE digest==?1;", + .args = "D", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_COUNT, + .sql = "SELECT COUNT(*) FROM digests;", + .args = "", + .stmt = NULL, + .result = SQLITE_ROW + }, + { + .idx = RSPAMD_FUZZY_BACKEND_EXPIRE, + .sql = "DELETE FROM digests WHERE id IN (SELECT id FROM digests WHERE time < ?1 LIMIT ?2);", + .args = "II", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_VACUUM, + .sql = "VACUUM;", + .args = "", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, + .sql = "DELETE FROM shingles WHERE value=?1 AND number=?2;", + .args = "II", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_ADD_SOURCE, + .sql = "INSERT OR IGNORE INTO sources(name, version, last) VALUES (?1, ?2, ?3);", + .args = "TII", + .stmt = NULL, + .result = SQLITE_DONE + }, + { + .idx = RSPAMD_FUZZY_BACKEND_VERSION, + .sql = "SELECT version FROM sources WHERE name=?1;", + .args = "T", + .stmt = NULL, + .result = SQLITE_ROW + }, + { + .idx = RSPAMD_FUZZY_BACKEND_SET_VERSION, + .sql = "INSERT OR REPLACE INTO sources (name, version, last) VALUES (?3, ?1, ?2);", + .args = "IIT", + .stmt = NULL, + .result = SQLITE_DONE + }, +}; + +static GQuark +rspamd_fuzzy_backend_sqlite_quark (void) +{ + return g_quark_from_static_string ("fuzzy-backend-sqlite"); +} + +static gboolean +rspamd_fuzzy_backend_sqlite_prepare_stmts (struct rspamd_fuzzy_backend_sqlite *bk, GError **err) +{ + int i; + + for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i ++) { + if (prepared_stmts[i].stmt != NULL) { + /* Skip already prepared statements */ + continue; + } + if (sqlite3_prepare_v2 (bk->db, prepared_stmts[i].sql, -1, + &prepared_stmts[i].stmt, NULL) != SQLITE_OK) { + g_set_error (err, rspamd_fuzzy_backend_sqlite_quark (), + -1, "Cannot initialize prepared sql `%s`: %s", + prepared_stmts[i].sql, sqlite3_errmsg (bk->db)); + + return FALSE; + } + } + + return TRUE; +} + +static int +rspamd_fuzzy_backend_sqlite_cleanup_stmt (struct rspamd_fuzzy_backend_sqlite *backend, + int idx) +{ + sqlite3_stmt *stmt; + + if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) { + + return -1; + } + + msg_debug_fuzzy_backend ("resetting `%s`", prepared_stmts[idx].sql); + stmt = prepared_stmts[idx].stmt; + sqlite3_clear_bindings (stmt); + sqlite3_reset (stmt); + + return SQLITE_OK; +} + +static int +rspamd_fuzzy_backend_sqlite_run_stmt (struct rspamd_fuzzy_backend_sqlite *backend, + gboolean auto_cleanup, + int idx, ...) +{ + int retcode; + va_list ap; + sqlite3_stmt *stmt; + int i; + const char *argtypes; + guint retries = 0; + struct timespec ts; + + if (idx < 0 || idx >= RSPAMD_FUZZY_BACKEND_MAX) { + + return -1; + } + + stmt = prepared_stmts[idx].stmt; + g_assert ((int)prepared_stmts[idx].idx == idx); + + if (stmt == NULL) { + if ((retcode = sqlite3_prepare_v2 (backend->db, prepared_stmts[idx].sql, -1, + &prepared_stmts[idx].stmt, NULL)) != SQLITE_OK) { + msg_err_fuzzy_backend ("Cannot initialize prepared sql `%s`: %s", + prepared_stmts[idx].sql, sqlite3_errmsg (backend->db)); + + return retcode; + } + stmt = prepared_stmts[idx].stmt; + } + + msg_debug_fuzzy_backend ("executing `%s` %s auto cleanup", + prepared_stmts[idx].sql, auto_cleanup ? "with" : "without"); + argtypes = prepared_stmts[idx].args; + sqlite3_clear_bindings (stmt); + sqlite3_reset (stmt); + va_start (ap, idx); + + for (i = 0; argtypes[i] != '\0'; i++) { + switch (argtypes[i]) { + case 'T': + sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), -1, + SQLITE_STATIC); + break; + case 'I': + sqlite3_bind_int64 (stmt, i + 1, va_arg (ap, gint64)); + break; + case 'S': + sqlite3_bind_int (stmt, i + 1, va_arg (ap, gint)); + break; + case 'D': + /* Special case for digests variable */ + sqlite3_bind_text (stmt, i + 1, va_arg (ap, const char*), 64, + SQLITE_STATIC); + break; + } + } + + va_end (ap); + +retry: + retcode = sqlite3_step (stmt); + + if (retcode == prepared_stmts[idx].result) { + retcode = SQLITE_OK; + } + else { + if ((retcode == SQLITE_BUSY || + retcode == SQLITE_LOCKED) && retries++ < max_retries) { + double_to_ts (sql_sleep_time, &ts); + nanosleep (&ts, NULL); + goto retry; + } + + msg_debug_fuzzy_backend ("failed to execute query %s: %d, %s", prepared_stmts[idx].sql, + retcode, sqlite3_errmsg (backend->db)); + } + + if (auto_cleanup) { + sqlite3_clear_bindings (stmt); + sqlite3_reset (stmt); + } + + return retcode; +} + +static void +rspamd_fuzzy_backend_sqlite_close_stmts (struct rspamd_fuzzy_backend_sqlite *bk) +{ + int i; + + for (i = 0; i < RSPAMD_FUZZY_BACKEND_MAX; i++) { + if (prepared_stmts[i].stmt != NULL) { + sqlite3_finalize (prepared_stmts[i].stmt); + prepared_stmts[i].stmt = NULL; + } + } + + return; +} + +static gboolean +rspamd_fuzzy_backend_sqlite_run_sql (const gchar *sql, struct rspamd_fuzzy_backend_sqlite *bk, + GError **err) +{ + guint retries = 0; + struct timespec ts; + gint ret; + + do { + ret = sqlite3_exec (bk->db, sql, NULL, NULL, NULL); + double_to_ts (sql_sleep_time, &ts); + } while (ret == SQLITE_BUSY && retries++ < max_retries && + nanosleep (&ts, NULL) == 0); + + if (ret != SQLITE_OK) { + g_set_error (err, rspamd_fuzzy_backend_sqlite_quark (), + -1, "Cannot execute raw sql `%s`: %s", + sql, sqlite3_errmsg (bk->db)); + return FALSE; + } + + return TRUE; +} + +static struct rspamd_fuzzy_backend_sqlite * +rspamd_fuzzy_backend_sqlite_open_db (const gchar *path, GError **err) +{ + struct rspamd_fuzzy_backend_sqlite *bk; + rspamd_cryptobox_hash_state_t st; + guchar hash_out[rspamd_cryptobox_HASHBYTES]; + + g_assert (path != NULL); + + bk = g_malloc0 (sizeof (*bk)); + bk->path = g_strdup (path); + bk->expired = 0; + bk->pool = rspamd_mempool_new (rspamd_mempool_suggest_size (), + "fuzzy_backend", 0); + bk->db = rspamd_sqlite3_open_or_create (bk->pool, bk->path, + create_tables_sql, 1, err); + + if (bk->db == NULL) { + rspamd_fuzzy_backend_sqlite_close (bk); + + return NULL; + } + + if (!rspamd_fuzzy_backend_sqlite_prepare_stmts (bk, err)) { + rspamd_fuzzy_backend_sqlite_close (bk); + + return NULL; + } + + /* Set id for the backend */ + rspamd_cryptobox_hash_init (&st, NULL, 0); + rspamd_cryptobox_hash_update (&st, path, strlen (path)); + rspamd_cryptobox_hash_final (&st, hash_out); + rspamd_snprintf (bk->id, sizeof (bk->id), "%xs", hash_out); + memcpy (bk->pool->tag.uid, bk->id, sizeof (bk->pool->tag.uid)); + + return bk; +} + +struct rspamd_fuzzy_backend_sqlite * +rspamd_fuzzy_backend_sqlite_open (const gchar *path, + gboolean vacuum, + GError **err) +{ + struct rspamd_fuzzy_backend_sqlite *backend; + + if (path == NULL) { + g_set_error (err, rspamd_fuzzy_backend_sqlite_quark (), + ENOENT, "Path has not been specified"); + return NULL; + } + + /* Open database */ + if ((backend = rspamd_fuzzy_backend_sqlite_open_db (path, err)) == NULL) { + return NULL; + } + + if (rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, RSPAMD_FUZZY_BACKEND_COUNT) + == SQLITE_OK) { + backend->count = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT); + + return backend; +} + +static gint +rspamd_fuzzy_backend_sqlite_int64_cmp (const void *a, const void *b) +{ + gint64 ia = *(gint64 *)a, ib = *(gint64 *)b; + + return (ia - ib); +} + +struct rspamd_fuzzy_reply +rspamd_fuzzy_backend_sqlite_check (struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd, gint64 expire) +{ + struct rspamd_fuzzy_reply rep; + const struct rspamd_fuzzy_shingle_cmd *shcmd; + int rc; + gint64 timestamp; + gint64 shingle_values[RSPAMD_SHINGLE_SIZE], i, sel_id, cur_id, + cur_cnt, max_cnt; + + memset (&rep, 0, sizeof (rep)); + memcpy (rep.digest, cmd->digest, sizeof (rep.digest)); + + if (backend == NULL) { + return rep; + } + + /* Try direct match first of all */ + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_CHECK, + cmd->digest); + + if (rc == SQLITE_OK) { + timestamp = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 1); + if (time (NULL) - timestamp > expire) { + /* Expire element */ + msg_debug_fuzzy_backend ("requested hash has been expired"); + } + else { + rep.v1.value = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 0); + rep.v1.prob = 1.0; + rep.v1.flag = sqlite3_column_int ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, 2); + } + } + else if (cmd->shingles_count > 0) { + /* Fuzzy match */ + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); + shcmd = (const struct rspamd_fuzzy_shingle_cmd *)cmd; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE, + shcmd->sgl.hashes[i], i); + if (rc == SQLITE_OK) { + shingle_values[i] = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE].stmt, + 0); + } + else { + shingle_values[i] = -1; + } + msg_debug_fuzzy_backend ("looking for shingle %L -> %L: %d", i, + shcmd->sgl.hashes[i], rc); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, + RSPAMD_FUZZY_BACKEND_CHECK_SHINGLE); + + qsort (shingle_values, RSPAMD_SHINGLE_SIZE, sizeof (gint64), + rspamd_fuzzy_backend_sqlite_int64_cmp); + sel_id = -1; + cur_id = -1; + cur_cnt = 0; + max_cnt = 0; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i ++) { + if (shingle_values[i] == -1) { + continue; + } + + /* We have some value here, so we need to check it */ + if (shingle_values[i] == cur_id) { + cur_cnt ++; + } + else { + cur_id = shingle_values[i]; + if (cur_cnt >= max_cnt) { + max_cnt = cur_cnt; + sel_id = cur_id; + } + cur_cnt = 0; + } + } + + if (cur_cnt > max_cnt) { + max_cnt = cur_cnt; + } + + if (sel_id != -1) { + /* We have some id selected here */ + rep.v1.prob = (float)max_cnt / (float)RSPAMD_SHINGLE_SIZE; + + if (rep.v1.prob > 0.5) { + msg_debug_fuzzy_backend ( + "found fuzzy hash with probability %.2f", + rep.v1.prob); + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID, sel_id); + if (rc == SQLITE_OK) { + timestamp = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, + 2); + if (time (NULL) - timestamp > expire) { + /* Expire element */ + msg_debug_fuzzy_backend ( + "requested hash has been expired"); + rep.v1.prob = 0.0; + } + else { + rep.ts = timestamp; + memcpy (rep.digest, sqlite3_column_blob ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, + 0), sizeof (rep.digest)); + rep.v1.value = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, + 1); + rep.v1.flag = sqlite3_column_int ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID].stmt, + 3); + } + } + } + else { + /* Otherwise we assume that as error */ + rep.v1.value = 0; + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, + RSPAMD_FUZZY_BACKEND_GET_DIGEST_BY_ID); + } + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + return rep; +} + +gboolean +rspamd_fuzzy_backend_sqlite_prepare_update (struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source) +{ + gint rc; + + if (backend == NULL) { + return FALSE; + } + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot start transaction for updates: %s", + sqlite3_errmsg (backend->db)); + return FALSE; + } + + return TRUE; +} + +gboolean +rspamd_fuzzy_backend_sqlite_add (struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd) +{ + int rc, i; + gint64 id, flag; + const struct rspamd_fuzzy_shingle_cmd *shcmd; + + if (backend == NULL) { + return FALSE; + } + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_CHECK, + cmd->digest); + + if (rc == SQLITE_OK) { + /* Check flag */ + flag = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_CHECK].stmt, + 2); + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); + + if (flag == cmd->flag) { + /* We need to increase weight */ + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_UPDATE, + (gint64) cmd->value, + cmd->digest); + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot update hash to %d -> " + "%*xs: %s", (gint) cmd->flag, + (gint) sizeof (cmd->digest), cmd->digest, + sqlite3_errmsg (backend->db)); + } + } + else { + /* We need to relearn actually */ + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_UPDATE_FLAG, + (gint64) cmd->value, + (gint64) cmd->flag, + cmd->digest); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot update hash to %d -> " + "%*xs: %s", (gint) cmd->flag, + (gint) sizeof (cmd->digest), cmd->digest, + sqlite3_errmsg (backend->db)); + } + } + } + else { + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_INSERT, + (gint) cmd->flag, + cmd->digest, + (gint64) cmd->value); + + if (rc == SQLITE_OK) { + if (cmd->shingles_count > 0) { + id = sqlite3_last_insert_rowid (backend->db); + shcmd = (const struct rspamd_fuzzy_shingle_cmd *) cmd; + + for (i = 0; i < RSPAMD_SHINGLE_SIZE; i++) { + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_INSERT_SHINGLE, + shcmd->sgl.hashes[i], (gint64)i, id); + msg_debug_fuzzy_backend ("add shingle %d -> %L: %L", + i, + shcmd->sgl.hashes[i], + id); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot add shingle %d -> " + "%L: %L: %s", i, + shcmd->sgl.hashes[i], + id, sqlite3_errmsg (backend->db)); + } + } + } + } + else { + msg_warn_fuzzy_backend ("cannot add hash to %d -> " + "%*xs: %s", (gint)cmd->flag, + (gint)sizeof (cmd->digest), cmd->digest, + sqlite3_errmsg (backend->db)); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, + RSPAMD_FUZZY_BACKEND_INSERT); + } + + return (rc == SQLITE_OK); +} + +gboolean +rspamd_fuzzy_backend_sqlite_finish_update (struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source, gboolean version_bump) +{ + gint rc = SQLITE_OK, wal_frames, wal_checkpointed, ver; + + /* Get and update version */ + if (version_bump) { + ver = rspamd_fuzzy_backend_sqlite_version (backend, source); + ++ver; + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_SET_VERSION, + (gint64)ver, (gint64)time (NULL), source); + } + + if (rc == SQLITE_OK) { + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot commit updates: %s", + sqlite3_errmsg (backend->db)); + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + return FALSE; + } + else { + if (!rspamd_sqlite3_sync (backend->db, &wal_frames, &wal_checkpointed)) { + msg_warn_fuzzy_backend ("cannot commit checkpoint: %s", + sqlite3_errmsg (backend->db)); + } + else if (wal_checkpointed > 0) { + msg_info_fuzzy_backend ("total number of frames in the wal file: " + "%d, checkpointed: %d", wal_frames, wal_checkpointed); + } + } + } + else { + msg_warn_fuzzy_backend ("cannot update version for %s: %s", source, + sqlite3_errmsg (backend->db)); + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + return FALSE; + } + + return TRUE; +} + +gboolean +rspamd_fuzzy_backend_sqlite_del (struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd) +{ + int rc = -1; + + if (backend == NULL) { + return FALSE; + } + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_CHECK, + cmd->digest); + + if (rc == SQLITE_OK) { + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_DELETE, + cmd->digest); + if (rc != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot update hash to %d -> " + "%*xs: %s", (gint) cmd->flag, + (gint) sizeof (cmd->digest), cmd->digest, + sqlite3_errmsg (backend->db)); + } + } + else { + /* Hash is missing */ + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_CHECK); + } + + return (rc == SQLITE_OK); +} + +gboolean +rspamd_fuzzy_backend_sqlite_sync (struct rspamd_fuzzy_backend_sqlite *backend, + gint64 expire, + gboolean clean_orphaned) +{ + struct orphaned_shingle_elt { + gint64 value; + gint64 number; + }; + + /* Do not do more than 5k ops per step */ + const guint64 max_changes = 5000; + gboolean ret = FALSE; + gint64 expire_lim, expired; + gint rc, i, orphaned_cnt = 0; + GError *err = NULL; + static const gchar orphaned_shingles[] = "SELECT shingles.value,shingles.number " + "FROM shingles " + "LEFT JOIN digests ON " + "shingles.digest_id=digests.id WHERE " + "digests.id IS NULL;"; + sqlite3_stmt *stmt; + GArray *orphaned; + struct orphaned_shingle_elt orphaned_elt, *pelt; + + + if (backend == NULL) { + return FALSE; + } + + /* Perform expire */ + if (expire > 0) { + expire_lim = time (NULL) - expire; + + if (expire_lim > 0) { + ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + + if (ret == SQLITE_OK) { + + rc = rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_EXPIRE, expire_lim, max_changes); + + if (rc == SQLITE_OK) { + expired = sqlite3_changes (backend->db); + + if (expired > 0) { + backend->expired += expired; + msg_info_fuzzy_backend ("expired %L hashes", expired); + } + } + else { + msg_warn_fuzzy_backend ( + "cannot execute expired statement: %s", + sqlite3_errmsg (backend->db)); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, + RSPAMD_FUZZY_BACKEND_EXPIRE); + + ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + if (ret != SQLITE_OK) { + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + } + } + if (ret != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot expire db: %s", + sqlite3_errmsg (backend->db)); + } + } + } + + /* Cleanup database */ + if (clean_orphaned) { + ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_START); + + if (ret == SQLITE_OK) { + if ((rc = sqlite3_prepare_v2 (backend->db, + orphaned_shingles, + -1, + &stmt, + NULL)) != SQLITE_OK) { + msg_warn_fuzzy_backend ("cannot cleanup shingles: %s", + sqlite3_errmsg (backend->db)); + } + else { + orphaned = g_array_new (FALSE, + FALSE, + sizeof (struct orphaned_shingle_elt)); + + while (sqlite3_step (stmt) == SQLITE_ROW) { + orphaned_elt.value = sqlite3_column_int64 (stmt, 0); + orphaned_elt.number = sqlite3_column_int64 (stmt, 1); + g_array_append_val (orphaned, orphaned_elt); + + if (orphaned->len > max_changes) { + break; + } + } + + sqlite3_finalize (stmt); + orphaned_cnt = orphaned->len; + + if (orphaned_cnt > 0) { + msg_info_fuzzy_backend ( + "going to delete %ud orphaned shingles", + orphaned_cnt); + /* Need to delete orphaned elements */ + for (i = 0; i < (gint) orphaned_cnt; i++) { + pelt = &g_array_index (orphaned, + struct orphaned_shingle_elt, + i); + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_DELETE_ORPHANED, + pelt->value, pelt->number); + } + } + + + g_array_free (orphaned, TRUE); + } + + ret = rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_COMMIT); + + if (ret == SQLITE_OK) { + msg_info_fuzzy_backend ( + "deleted %ud orphaned shingles", + orphaned_cnt); + } + else { + msg_warn_fuzzy_backend ( + "cannot synchronize fuzzy backend: %e", + err); + rspamd_fuzzy_backend_sqlite_run_stmt (backend, TRUE, + RSPAMD_FUZZY_BACKEND_TRANSACTION_ROLLBACK); + } + } + } + + return ret; +} + + +void +rspamd_fuzzy_backend_sqlite_close (struct rspamd_fuzzy_backend_sqlite *backend) +{ + if (backend != NULL) { + if (backend->db != NULL) { + rspamd_fuzzy_backend_sqlite_close_stmts (backend); + sqlite3_close (backend->db); + } + + if (backend->path != NULL) { + g_free (backend->path); + } + + if (backend->pool) { + rspamd_mempool_delete (backend->pool); + } + + g_free (backend); + } +} + + +gsize +rspamd_fuzzy_backend_sqlite_count (struct rspamd_fuzzy_backend_sqlite *backend) +{ + if (backend) { + if (rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_COUNT) == SQLITE_OK) { + backend->count = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_COUNT].stmt, 0); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_COUNT); + + return backend->count; + } + + return 0; +} + +gint +rspamd_fuzzy_backend_sqlite_version (struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source) +{ + gint ret = 0; + + if (backend) { + if (rspamd_fuzzy_backend_sqlite_run_stmt (backend, FALSE, + RSPAMD_FUZZY_BACKEND_VERSION, source) == SQLITE_OK) { + ret = sqlite3_column_int64 ( + prepared_stmts[RSPAMD_FUZZY_BACKEND_VERSION].stmt, 0); + } + + rspamd_fuzzy_backend_sqlite_cleanup_stmt (backend, RSPAMD_FUZZY_BACKEND_VERSION); + } + + return ret; +} + +gsize +rspamd_fuzzy_backend_sqlite_expired (struct rspamd_fuzzy_backend_sqlite *backend) +{ + return backend != NULL ? backend->expired : 0; +} + +const gchar * +rspamd_fuzzy_sqlite_backend_id (struct rspamd_fuzzy_backend_sqlite *backend) +{ + return backend != NULL ? backend->id : 0; +} diff --git a/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h new file mode 100644 index 000000000..33dc94f30 --- /dev/null +++ b/src/libserver/fuzzy_backend/fuzzy_backend_sqlite.h @@ -0,0 +1,107 @@ +/*- + * Copyright 2016 Vsevolod Stakhov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef FUZZY_BACKEND_H_ +#define FUZZY_BACKEND_H_ + +#include "config.h" +#include "fuzzy_wire.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct rspamd_fuzzy_backend_sqlite; + +/** + * Open fuzzy backend + * @param path file to open (legacy file will be converted automatically) + * @param err error pointer + * @return backend structure or NULL + */ +struct rspamd_fuzzy_backend_sqlite *rspamd_fuzzy_backend_sqlite_open (const gchar *path, + gboolean vacuum, + GError **err); + +/** + * Check specified fuzzy in the backend + * @param backend + * @param cmd + * @return reply with probability and weight + */ +struct rspamd_fuzzy_reply rspamd_fuzzy_backend_sqlite_check ( + struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd, + gint64 expire); + +/** + * Prepare storage for updates (by starting transaction) + */ +gboolean rspamd_fuzzy_backend_sqlite_prepare_update (struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source); + +/** + * Add digest to the database + * @param backend + * @param cmd + * @return + */ +gboolean rspamd_fuzzy_backend_sqlite_add (struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd); + +/** + * Delete digest from the database + * @param backend + * @param cmd + * @return + */ +gboolean rspamd_fuzzy_backend_sqlite_del ( + struct rspamd_fuzzy_backend_sqlite *backend, + const struct rspamd_fuzzy_cmd *cmd); + +/** + * Commit updates to storage + */ +gboolean rspamd_fuzzy_backend_sqlite_finish_update (struct rspamd_fuzzy_backend_sqlite *backend, + const gchar *source, gboolean version_bump); + +/** + * Sync storage + * @param backend + * @return + */ +gboolean rspamd_fuzzy_backend_sqlite_sync (struct rspamd_fuzzy_backend_sqlite *backend, + gint64 expire, + gboolean clean_orphaned); + +/** + * Close storage + * @param backend + */ +void rspamd_fuzzy_backend_sqlite_close (struct rspamd_fuzzy_backend_sqlite *backend); + +gsize rspamd_fuzzy_backend_sqlite_count (struct rspamd_fuzzy_backend_sqlite *backend); + +gint rspamd_fuzzy_backend_sqlite_version (struct rspamd_fuzzy_backend_sqlite *backend, const gchar *source); + +gsize rspamd_fuzzy_backend_sqlite_expired (struct rspamd_fuzzy_backend_sqlite *backend); + +const gchar *rspamd_fuzzy_sqlite_backend_id (struct rspamd_fuzzy_backend_sqlite *backend); + +#ifdef __cplusplus +} +#endif + +#endif /* FUZZY_BACKEND_H_ */ |