diff options
-rw-r--r-- | src/libstat/backends/redis_backend.cxx | 721 |
1 files changed, 0 insertions, 721 deletions
diff --git a/src/libstat/backends/redis_backend.cxx b/src/libstat/backends/redis_backend.cxx index fe38b52bc..f3104967b 100644 --- a/src/libstat/backends/redis_backend.cxx +++ b/src/libstat/backends/redis_backend.cxx @@ -389,727 +389,6 @@ gsize rspamd_redis_expand_object(const gchar *pattern, return tlen; } - -#if 0 -// Leave it unless the conversion is done, to use as a reference -static rspamd_fstring_t * -rspamd_redis_tokens_to_query(struct rspamd_task *task, - struct redis_stat_runtime *rt, - GPtrArray *tokens, - const gchar *command, - const gchar *prefix, - gboolean learn, - gint idx, - gboolean intvals) -{ - rspamd_fstring_t *out; - rspamd_token_t *tok; - gchar n0[512], n1[64]; - guint i, l0, l1, cmd_len, prefix_len; - gint ret; - - g_assert(tokens != nullptr); - - cmd_len = strlen(command); - prefix_len = strlen(prefix); - out = rspamd_fstring_sized_new(1024); - - if (learn) { - rspamd_printf_fstring(&out, "*1\r\n$5\r\nMULTI\r\n"); - - ret = redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, - out->str, out->len); - - if (ret != REDIS_OK) { - msg_err_task("call to redis failed: %s", rt->redis->errstr); - rspamd_fstring_free(out); - - return nullptr; - } - - out->len = 0; - } - else { - if (rt->ctx->new_schema) { - /* Multi + HGET */ - rspamd_printf_fstring(&out, "*1\r\n$5\r\nMULTI\r\n"); - - ret = redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, - out->str, out->len); - - if (ret != REDIS_OK) { - msg_err_task("call to redis failed: %s", rt->redis->errstr); - rspamd_fstring_free(out); - - return nullptr; - } - - out->len = 0; - } - else { - rspamd_printf_fstring(&out, "" - "*%d\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n", - (tokens->len + 2), - cmd_len, command, - prefix_len, prefix); - } - } - - for (i = 0; i < tokens->len; i++) { - tok = g_ptr_array_index(tokens, i); - - if (learn) { - if (intvals) { - l1 = rspamd_snprintf(n1, sizeof(n1), "%L", - (gint64) tok->values[idx]); - } - else { - l1 = rspamd_snprintf(n1, sizeof(n1), "%f", - tok->values[idx]); - } - - if (rt->ctx->new_schema) { - /* - * HINCRBY <prefix_token> <0|1> <value> - */ - l0 = rspamd_snprintf(n0, sizeof(n0), "%*s_%uL", - prefix_len, prefix, - tok->data); - - rspamd_printf_fstring(&out, "" - "*4\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n", - cmd_len, command, - l0, n0, - 1, rt->stcf->is_spam ? "S" : "H", - l1, n1); - } - else { - l0 = rspamd_snprintf(n0, sizeof(n0), "%uL", tok->data); - - /* - * HINCRBY <prefix> <token> <value> - */ - rspamd_printf_fstring(&out, "" - "*4\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n", - cmd_len, command, - prefix_len, prefix, - l0, n0, - l1, n1); - } - - ret = redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, - out->str, out->len); - - if (ret != REDIS_OK) { - msg_err_task("call to redis failed: %s", rt->redis->errstr); - rspamd_fstring_free(out); - - return nullptr; - } - - if (rt->ctx->store_tokens) { - - if (!rt->ctx->new_schema) { - /* - * We store tokens in form - * HSET prefix_tokens <token_id> "token_string" - * ZINCRBY prefix_z 1.0 <token_id> - */ - if (tok->t1 && tok->t2) { - redisAsyncCommand(rt->redis, nullptr, nullptr, - "HSET %b_tokens %b %b:%b", - prefix, (size_t) prefix_len, - n0, (size_t) l0, - tok->t1->stemmed.begin, tok->t1->stemmed.len, - tok->t2->stemmed.begin, tok->t2->stemmed.len); - } - else if (tok->t1) { - redisAsyncCommand(rt->redis, nullptr, nullptr, - "HSET %b_tokens %b %b", - prefix, (size_t) prefix_len, - n0, (size_t) l0, - tok->t1->stemmed.begin, - tok->t1->stemmed.len); - } - } - else { - /* - * We store tokens in form - * HSET <token_id> "tokens" "token_string" - * ZINCRBY prefix_z 1.0 <token_id> - */ - if (tok->t1 && tok->t2) { - redisAsyncCommand(rt->redis, nullptr, nullptr, - "HSET %b %s %b:%b", - n0, (size_t) l0, - "tokens", - tok->t1->stemmed.begin, tok->t1->stemmed.len, - tok->t2->stemmed.begin, tok->t2->stemmed.len); - } - else if (tok->t1) { - redisAsyncCommand(rt->redis, nullptr, nullptr, - "HSET %b %s %b", - n0, (size_t) l0, - "tokens", - tok->t1->stemmed.begin, tok->t1->stemmed.len); - } - } - - redisAsyncCommand(rt->redis, nullptr, nullptr, - "ZINCRBY %b_z %b %b", - prefix, (size_t) prefix_len, - n1, (size_t) l1, - n0, (size_t) l0); - } - - if (rt->ctx->new_schema && rt->ctx->expiry > 0) { - out->len = 0; - l1 = rspamd_snprintf(n1, sizeof(n1), "%d", - rt->ctx->expiry); - - rspamd_printf_fstring(&out, "" - "*3\r\n" - "$6\r\n" - "EXPIRE\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n", - l0, n0, - l1, n1); - redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, - out->str, out->len); - } - - out->len = 0; - } - else { - if (rt->ctx->new_schema) { - l0 = rspamd_snprintf(n0, sizeof(n0), "%*s_%uL", - prefix_len, prefix, - tok->data); - - rspamd_printf_fstring(&out, "" - "*3\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n", - cmd_len, command, - l0, n0, - 1, rt->stcf->is_spam ? "S" : "H"); - - ret = redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, - out->str, out->len); - - if (ret != REDIS_OK) { - msg_err_task("call to redis failed: %s", rt->redis->errstr); - rspamd_fstring_free(out); - - return nullptr; - } - - out->len = 0; - } - else { - l0 = rspamd_snprintf(n0, sizeof(n0), "%uL", tok->data); - rspamd_printf_fstring(&out, "" - "$%d\r\n" - "%s\r\n", - l0, n0); - } - } - } - - if (!learn && rt->ctx->new_schema) { - rspamd_printf_fstring(&out, "*1\r\n$4\r\nEXEC\r\n"); - } - - return out; -} - - -static void -rspamd_redis_store_stat_signature(struct rspamd_task *task, - struct redis_stat_runtime *rt, - GPtrArray *tokens, - const gchar *prefix) -{ - gchar *sig, keybuf[512], nbuf[64]; - rspamd_token_t *tok; - guint i, blen, klen; - rspamd_fstring_t *out; - - sig = rspamd_mempool_get_variable(task->task_pool, - RSPAMD_MEMPOOL_STAT_SIGNATURE); - - if (sig == nullptr) { - msg_err_task("cannot get bayes signature"); - return; - } - - out = rspamd_fstring_sized_new(1024); - klen = rspamd_snprintf(keybuf, sizeof(keybuf), "%s_%s_%s", - prefix, sig, rt->stcf->is_spam ? "S" : "H"); - - /* Cleanup key */ - rspamd_printf_fstring(&out, "" - "*2\r\n" - "$3\r\n" - "DEL\r\n" - "$%d\r\n" - "%s\r\n", - klen, keybuf); - redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, - out->str, out->len); - out->len = 0; - - rspamd_printf_fstring(&out, "" - "*%d\r\n" - "$5\r\n" - "LPUSH\r\n" - "$%d\r\n" - "%s\r\n", - tokens->len + 2, - klen, keybuf); - - PTR_ARRAY_FOREACH(tokens, i, tok) - { - blen = rspamd_snprintf(nbuf, sizeof(nbuf), "%uL", tok->data); - rspamd_printf_fstring(&out, "" - "$%d\r\n" - "%s\r\n", - blen, nbuf); - } - - redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, - out->str, out->len); - out->len = 0; - - if (rt->ctx->expiry > 0) { - out->len = 0; - blen = rspamd_snprintf(nbuf, sizeof(nbuf), "%d", - rt->ctx->expiry); - - rspamd_printf_fstring(&out, "" - "*3\r\n" - "$6\r\n" - "EXPIRE\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n", - klen, keybuf, - blen, nbuf); - redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, - out->str, out->len); - } - - rspamd_fstring_free(out); -} - -static void -rspamd_redis_async_cbdata_cleanup(struct rspamd_redis_stat_cbdata *cbdata) -{ - guint i; - gchar *k; - - if (cbdata && !cbdata->wanna_die) { - /* Avoid double frees */ - cbdata->wanna_die = TRUE; - redisAsyncFree(cbdata->redis); - - for (i = 0; i < cbdata->cur_keys->len; i++) { - k = g_ptr_array_index(cbdata->cur_keys, i); - g_free(k); - } - - g_ptr_array_free(cbdata->cur_keys, TRUE); - - if (cbdata->elt) { - cbdata->elt->cbdata = nullptr; - /* Re-enable parent event */ - cbdata->elt->async->enabled = TRUE; - - /* Replace ucl object */ - if (cbdata->cur) { - if (cbdata->elt->stat) { - ucl_object_unref(cbdata->elt->stat); - } - - cbdata->elt->stat = cbdata->cur; - cbdata->cur = nullptr; - } - } - - if (cbdata->cur) { - ucl_object_unref(cbdata->cur); - } - - g_free(cbdata); - } -} - -/* Called when we get number of learns for a specific key */ -static void -rspamd_redis_stat_learns(redisAsyncContext *c, gpointer r, gpointer priv) -{ - struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *) priv; - struct rspamd_redis_stat_cbdata *cbdata; - redisReply *reply = r; - ucl_object_t *obj; - gulong num = 0; - - cbdata = redis_elt->cbdata; - - if (cbdata == nullptr || cbdata->wanna_die) { - return; - } - - cbdata->inflight--; - - if (c->err == 0 && r != nullptr) { - if (G_LIKELY(reply->type == REDIS_REPLY_INTEGER)) { - num = reply->integer; - } - else if (reply->type == REDIS_REPLY_STRING) { - rspamd_strtoul(reply->str, reply->len, &num); - } - - obj = (ucl_object_t *) ucl_object_lookup(cbdata->cur, "revision"); - if (obj) { - obj->value.iv += num; - } - } - - if (cbdata->inflight == 0) { - rspamd_redis_async_cbdata_cleanup(cbdata); - redis_elt->cbdata = nullptr; - } -} - -/* Called when we get number of elements for a specific key */ -static void -rspamd_redis_stat_key(redisAsyncContext *c, gpointer r, gpointer priv) -{ - struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *) priv; - struct rspamd_redis_stat_cbdata *cbdata; - redisReply *reply = r; - ucl_object_t *obj; - glong num = 0; - - cbdata = redis_elt->cbdata; - - if (cbdata == nullptr || cbdata->wanna_die) { - return; - } - - cbdata->inflight--; - - if (c->err == 0 && r != nullptr) { - if (G_LIKELY(reply->type == REDIS_REPLY_INTEGER)) { - num = reply->integer; - } - else if (reply->type == REDIS_REPLY_STRING) { - rspamd_strtol(reply->str, reply->len, &num); - } - - if (num < 0) { - msg_err("bad learns count: %L", (gint64) num); - num = 0; - } - - obj = (ucl_object_t *) ucl_object_lookup(cbdata->cur, "used"); - if (obj) { - obj->value.iv += num; - } - - obj = (ucl_object_t *) ucl_object_lookup(cbdata->cur, "total"); - if (obj) { - obj->value.iv += num; - } - - obj = (ucl_object_t *) ucl_object_lookup(cbdata->cur, "size"); - if (obj) { - /* Size of key + size of int64_t */ - obj->value.iv += num * (sizeof(G_STRINGIFY(G_MAXINT64)) + - sizeof(guint64) + sizeof(gpointer)); - } - } - - if (cbdata->inflight == 0) { - rspamd_redis_async_cbdata_cleanup(cbdata); - redis_elt->cbdata = nullptr; - } -} - -/* Called when we have connected to the redis server and got keys to check */ -static void -rspamd_redis_stat_keys(redisAsyncContext *c, gpointer r, gpointer priv) -{ - struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *) priv; - struct rspamd_redis_stat_cbdata *cbdata; - redisReply *reply = r, *more_elt, *elts, *elt; - gchar **pk, *k; - guint i, processed = 0; - gboolean more = false; - - cbdata = redis_elt->cbdata; - - if (cbdata == nullptr || cbdata->wanna_die) { - return; - } - - cbdata->inflight--; - - if (c->err == 0 && r != nullptr) { - if (reply->type == REDIS_REPLY_ARRAY) { - more_elt = reply->element[0]; - elts = reply->element[1]; - - if (more_elt != nullptr && more_elt->str != nullptr && strcmp(more_elt->str, "0") != 0) { - more = true; - } - - /* Clear the existing stuff */ - PTR_ARRAY_FOREACH(cbdata->cur_keys, i, k) - { - if (k) { - g_free(k); - } - } - - g_ptr_array_set_size(cbdata->cur_keys, elts->elements); - - for (i = 0; i < elts->elements; i++) { - elt = elts->element[i]; - - if (elt->type == REDIS_REPLY_STRING) { - pk = (gchar **) &g_ptr_array_index(cbdata->cur_keys, i); - *pk = g_malloc(elt->len + 1); - rspamd_strlcpy(*pk, elt->str, elt->len + 1); - processed++; - } - else { - pk = (gchar **) &g_ptr_array_index(cbdata->cur_keys, i); - *pk = nullptr; - } - } - - if (processed) { - PTR_ARRAY_FOREACH(cbdata->cur_keys, i, k) - { - if (k) { - const gchar *learned_key = "learns"; - - if (cbdata->elt->ctx->new_schema) { - if (cbdata->elt->ctx->stcf->is_spam) { - learned_key = "learns_spam"; - } - else { - learned_key = "learns_ham"; - } - redisAsyncCommand(cbdata->redis, - rspamd_redis_stat_learns, - redis_elt, - "HGET %s %s", - k, learned_key); - cbdata->inflight += 1; - } - else { - redisAsyncCommand(cbdata->redis, - rspamd_redis_stat_key, - redis_elt, - "HLEN %s", - k); - redisAsyncCommand(cbdata->redis, - rspamd_redis_stat_learns, - redis_elt, - "HGET %s %s", - k, learned_key); - cbdata->inflight += 2; - } - } - } - } - } - - if (more) { - /* Get more stat keys */ - redisAsyncCommand(cbdata->redis, rspamd_redis_stat_keys, redis_elt, - "SSCAN %s_keys %s COUNT %d", - cbdata->elt->ctx->stcf->symbol, - more_elt->str, - cbdata->elt->ctx->max_users); - - cbdata->inflight += 1; - } - else { - /* Set up the required keys */ - ucl_object_insert_key(cbdata->cur, - ucl_object_typed_new(UCL_INT), "revision", 0, false); - ucl_object_insert_key(cbdata->cur, - ucl_object_typed_new(UCL_INT), "used", 0, false); - ucl_object_insert_key(cbdata->cur, - ucl_object_typed_new(UCL_INT), "total", 0, false); - ucl_object_insert_key(cbdata->cur, - ucl_object_typed_new(UCL_INT), "size", 0, false); - ucl_object_insert_key(cbdata->cur, - ucl_object_fromstring(cbdata->elt->ctx->stcf->symbol), - "symbol", 0, false); - ucl_object_insert_key(cbdata->cur, ucl_object_fromstring("redis"), - "type", 0, false); - ucl_object_insert_key(cbdata->cur, ucl_object_fromint(0), - "languages", 0, false); - ucl_object_insert_key(cbdata->cur, ucl_object_fromint(processed), - "users", 0, false); - - rspamd_upstream_ok(cbdata->selected); - - if (cbdata->inflight == 0) { - rspamd_redis_async_cbdata_cleanup(cbdata); - redis_elt->cbdata = nullptr; - } - } - } - else { - if (c->errstr) { - msg_err("cannot get keys to gather stat: %s", c->errstr); - } - else { - msg_err("cannot get keys to gather stat: unknown error"); - } - - rspamd_upstream_fail(cbdata->selected, FALSE, c->errstr); - rspamd_redis_async_cbdata_cleanup(cbdata); - redis_elt->cbdata = nullptr; - } -} - -static void -rspamd_redis_async_stat_cb(struct rspamd_stat_async_elt *elt, gpointer d) -{ - struct redis_stat_ctx *ctx; - struct rspamd_redis_stat_elt *redis_elt = elt->ud; - struct rspamd_redis_stat_cbdata *cbdata; - rspamd_inet_addr_t *addr; - struct upstream_list *ups; - redisAsyncContext *redis_ctx; - struct upstream *selected; - - g_assert(redis_elt != nullptr); - - ctx = redis_elt->ctx; - - if (redis_elt->cbdata) { - /* We have some other process pending */ - rspamd_redis_async_cbdata_cleanup(redis_elt->cbdata); - redis_elt->cbdata = nullptr; - } - - /* Disable further events unless needed */ - elt->enabled = FALSE; - - ups = rspamd_redis_get_servers(ctx, "read_servers"); - - if (!ups) { - return; - } - - selected = rspamd_upstream_get(ups, - RSPAMD_UPSTREAM_ROUND_ROBIN, - nullptr, - 0); - - g_assert(selected != nullptr); - addr = rspamd_upstream_addr_next(selected); - g_assert(addr != nullptr); - - if (rspamd_inet_address_get_af(addr) == AF_UNIX) { - redis_ctx = redisAsyncConnectUnix(rspamd_inet_address_to_string(addr)); - } - else { - redis_ctx = redisAsyncConnect(rspamd_inet_address_to_string(addr), - rspamd_inet_address_get_port(addr)); - } - - if (redis_ctx == nullptr) { - msg_warn("cannot connect to redis server %s: %s", - rspamd_inet_address_to_string_pretty(addr), - strerror(errno)); - - return; - } - else if (redis_ctx->err != REDIS_OK) { - msg_warn("cannot connect to redis server %s: %s", - rspamd_inet_address_to_string_pretty(addr), - redis_ctx->errstr); - redisAsyncFree(redis_ctx); - - return; - } - - redisLibevAttach(redis_elt->event_loop, redis_ctx); - cbdata = g_malloc0(sizeof(*cbdata)); - cbdata->redis = redis_ctx; - cbdata->selected = selected; - cbdata->inflight = 1; - cbdata->cur = ucl_object_typed_new(UCL_OBJECT); - cbdata->elt = redis_elt; - cbdata->cur_keys = g_ptr_array_sized_new(ctx->max_users); - redis_elt->cbdata = cbdata; - - /* XXX: deal with timeouts maybe */ - /* Get keys in redis that match our symbol */ - redisAsyncCommand(cbdata->redis, rspamd_redis_stat_keys, redis_elt, - "SSCAN %s_keys 0 COUNT %d", - ctx->stcf->symbol, - ctx->max_users); -} - -static void -rspamd_redis_async_stat_fin(struct rspamd_stat_async_elt *elt, gpointer d) -{ - struct rspamd_redis_stat_elt *redis_elt = elt->ud; - - if (redis_elt->cbdata != nullptr) { - rspamd_redis_async_cbdata_cleanup(redis_elt->cbdata); - redis_elt->cbdata = nullptr; - } - - /* Clear the static elements */ - if (redis_elt->stat) { - ucl_object_unref(redis_elt->stat); - redis_elt->stat = nullptr; - } - - g_free(redis_elt); -} - -#endif - static int rspamd_redis_stat_cb(lua_State *L) { |