return tlen;
}
-
-#if 0
-// Leave it unless the conversion is done, to use as a reference
-static rspamd_fstring_t *
-rspamd_redis_tokens_to_query(struct rspamd_task *task,
- struct redis_stat_runtime *rt,
- GPtrArray *tokens,
- const gchar *command,
- const gchar *prefix,
- gboolean learn,
- gint idx,
- gboolean intvals)
-{
- rspamd_fstring_t *out;
- rspamd_token_t *tok;
- gchar n0[512], n1[64];
- guint i, l0, l1, cmd_len, prefix_len;
- gint ret;
-
- g_assert(tokens != nullptr);
-
- cmd_len = strlen(command);
- prefix_len = strlen(prefix);
- out = rspamd_fstring_sized_new(1024);
-
- if (learn) {
- rspamd_printf_fstring(&out, "*1\r\n$5\r\nMULTI\r\n");
-
- ret = redisAsyncFormattedCommand(rt->redis, nullptr, nullptr,
- out->str, out->len);
-
- if (ret != REDIS_OK) {
- msg_err_task("call to redis failed: %s", rt->redis->errstr);
- rspamd_fstring_free(out);
-
- return nullptr;
- }
-
- out->len = 0;
- }
- else {
- if (rt->ctx->new_schema) {
- /* Multi + HGET */
- rspamd_printf_fstring(&out, "*1\r\n$5\r\nMULTI\r\n");
-
- ret = redisAsyncFormattedCommand(rt->redis, nullptr, nullptr,
- out->str, out->len);
-
- if (ret != REDIS_OK) {
- msg_err_task("call to redis failed: %s", rt->redis->errstr);
- rspamd_fstring_free(out);
-
- return nullptr;
- }
-
- out->len = 0;
- }
- else {
- rspamd_printf_fstring(&out, ""
- "*%d\r\n"
- "$%d\r\n"
- "%s\r\n"
- "$%d\r\n"
- "%s\r\n",
- (tokens->len + 2),
- cmd_len, command,
- prefix_len, prefix);
- }
- }
-
- for (i = 0; i < tokens->len; i++) {
- tok = g_ptr_array_index(tokens, i);
-
- if (learn) {
- if (intvals) {
- l1 = rspamd_snprintf(n1, sizeof(n1), "%L",
- (gint64) tok->values[idx]);
- }
- else {
- l1 = rspamd_snprintf(n1, sizeof(n1), "%f",
- tok->values[idx]);
- }
-
- if (rt->ctx->new_schema) {
- /*
- * HINCRBY <prefix_token> <0|1> <value>
- */
- l0 = rspamd_snprintf(n0, sizeof(n0), "%*s_%uL",
- prefix_len, prefix,
- tok->data);
-
- rspamd_printf_fstring(&out, ""
- "*4\r\n"
- "$%d\r\n"
- "%s\r\n"
- "$%d\r\n"
- "%s\r\n"
- "$%d\r\n"
- "%s\r\n"
- "$%d\r\n"
- "%s\r\n",
- cmd_len, command,
- l0, n0,
- 1, rt->stcf->is_spam ? "S" : "H",
- l1, n1);
- }
- else {
- l0 = rspamd_snprintf(n0, sizeof(n0), "%uL", tok->data);
-
- /*
- * HINCRBY <prefix> <token> <value>
- */
- rspamd_printf_fstring(&out, ""
- "*4\r\n"
- "$%d\r\n"
- "%s\r\n"
- "$%d\r\n"
- "%s\r\n"
- "$%d\r\n"
- "%s\r\n"
- "$%d\r\n"
- "%s\r\n",
- cmd_len, command,
- prefix_len, prefix,
- l0, n0,
- l1, n1);
- }
-
- ret = redisAsyncFormattedCommand(rt->redis, nullptr, nullptr,
- out->str, out->len);
-
- if (ret != REDIS_OK) {
- msg_err_task("call to redis failed: %s", rt->redis->errstr);
- rspamd_fstring_free(out);
-
- return nullptr;
- }
-
- if (rt->ctx->store_tokens) {
-
- if (!rt->ctx->new_schema) {
- /*
- * We store tokens in form
- * HSET prefix_tokens <token_id> "token_string"
- * ZINCRBY prefix_z 1.0 <token_id>
- */
- if (tok->t1 && tok->t2) {
- redisAsyncCommand(rt->redis, nullptr, nullptr,
- "HSET %b_tokens %b %b:%b",
- prefix, (size_t) prefix_len,
- n0, (size_t) l0,
- tok->t1->stemmed.begin, tok->t1->stemmed.len,
- tok->t2->stemmed.begin, tok->t2->stemmed.len);
- }
- else if (tok->t1) {
- redisAsyncCommand(rt->redis, nullptr, nullptr,
- "HSET %b_tokens %b %b",
- prefix, (size_t) prefix_len,
- n0, (size_t) l0,
- tok->t1->stemmed.begin,
- tok->t1->stemmed.len);
- }
- }
- else {
- /*
- * We store tokens in form
- * HSET <token_id> "tokens" "token_string"
- * ZINCRBY prefix_z 1.0 <token_id>
- */
- if (tok->t1 && tok->t2) {
- redisAsyncCommand(rt->redis, nullptr, nullptr,
- "HSET %b %s %b:%b",
- n0, (size_t) l0,
- "tokens",
- tok->t1->stemmed.begin, tok->t1->stemmed.len,
- tok->t2->stemmed.begin, tok->t2->stemmed.len);
- }
- else if (tok->t1) {
- redisAsyncCommand(rt->redis, nullptr, nullptr,
- "HSET %b %s %b",
- n0, (size_t) l0,
- "tokens",
- tok->t1->stemmed.begin, tok->t1->stemmed.len);
- }
- }
-
- redisAsyncCommand(rt->redis, nullptr, nullptr,
- "ZINCRBY %b_z %b %b",
- prefix, (size_t) prefix_len,
- n1, (size_t) l1,
- n0, (size_t) l0);
- }
-
- if (rt->ctx->new_schema && rt->ctx->expiry > 0) {
- out->len = 0;
- l1 = rspamd_snprintf(n1, sizeof(n1), "%d",
- rt->ctx->expiry);
-
- rspamd_printf_fstring(&out, ""
- "*3\r\n"
- "$6\r\n"
- "EXPIRE\r\n"
- "$%d\r\n"
- "%s\r\n"
- "$%d\r\n"
- "%s\r\n",
- l0, n0,
- l1, n1);
- redisAsyncFormattedCommand(rt->redis, nullptr, nullptr,
- out->str, out->len);
- }
-
- out->len = 0;
- }
- else {
- if (rt->ctx->new_schema) {
- l0 = rspamd_snprintf(n0, sizeof(n0), "%*s_%uL",
- prefix_len, prefix,
- tok->data);
-
- rspamd_printf_fstring(&out, ""
- "*3\r\n"
- "$%d\r\n"
- "%s\r\n"
- "$%d\r\n"
- "%s\r\n"
- "$%d\r\n"
- "%s\r\n",
- cmd_len, command,
- l0, n0,
- 1, rt->stcf->is_spam ? "S" : "H");
-
- ret = redisAsyncFormattedCommand(rt->redis, nullptr, nullptr,
- out->str, out->len);
-
- if (ret != REDIS_OK) {
- msg_err_task("call to redis failed: %s", rt->redis->errstr);
- rspamd_fstring_free(out);
-
- return nullptr;
- }
-
- out->len = 0;
- }
- else {
- l0 = rspamd_snprintf(n0, sizeof(n0), "%uL", tok->data);
- rspamd_printf_fstring(&out, ""
- "$%d\r\n"
- "%s\r\n",
- l0, n0);
- }
- }
- }
-
- if (!learn && rt->ctx->new_schema) {
- rspamd_printf_fstring(&out, "*1\r\n$4\r\nEXEC\r\n");
- }
-
- return out;
-}
-
-
-static void
-rspamd_redis_store_stat_signature(struct rspamd_task *task,
- struct redis_stat_runtime *rt,
- GPtrArray *tokens,
- const gchar *prefix)
-{
- gchar *sig, keybuf[512], nbuf[64];
- rspamd_token_t *tok;
- guint i, blen, klen;
- rspamd_fstring_t *out;
-
- sig = rspamd_mempool_get_variable(task->task_pool,
- RSPAMD_MEMPOOL_STAT_SIGNATURE);
-
- if (sig == nullptr) {
- msg_err_task("cannot get bayes signature");
- return;
- }
-
- out = rspamd_fstring_sized_new(1024);
- klen = rspamd_snprintf(keybuf, sizeof(keybuf), "%s_%s_%s",
- prefix, sig, rt->stcf->is_spam ? "S" : "H");
-
- /* Cleanup key */
- rspamd_printf_fstring(&out, ""
- "*2\r\n"
- "$3\r\n"
- "DEL\r\n"
- "$%d\r\n"
- "%s\r\n",
- klen, keybuf);
- redisAsyncFormattedCommand(rt->redis, nullptr, nullptr,
- out->str, out->len);
- out->len = 0;
-
- rspamd_printf_fstring(&out, ""
- "*%d\r\n"
- "$5\r\n"
- "LPUSH\r\n"
- "$%d\r\n"
- "%s\r\n",
- tokens->len + 2,
- klen, keybuf);
-
- PTR_ARRAY_FOREACH(tokens, i, tok)
- {
- blen = rspamd_snprintf(nbuf, sizeof(nbuf), "%uL", tok->data);
- rspamd_printf_fstring(&out, ""
- "$%d\r\n"
- "%s\r\n",
- blen, nbuf);
- }
-
- redisAsyncFormattedCommand(rt->redis, nullptr, nullptr,
- out->str, out->len);
- out->len = 0;
-
- if (rt->ctx->expiry > 0) {
- out->len = 0;
- blen = rspamd_snprintf(nbuf, sizeof(nbuf), "%d",
- rt->ctx->expiry);
-
- rspamd_printf_fstring(&out, ""
- "*3\r\n"
- "$6\r\n"
- "EXPIRE\r\n"
- "$%d\r\n"
- "%s\r\n"
- "$%d\r\n"
- "%s\r\n",
- klen, keybuf,
- blen, nbuf);
- redisAsyncFormattedCommand(rt->redis, nullptr, nullptr,
- out->str, out->len);
- }
-
- rspamd_fstring_free(out);
-}
-
-static void
-rspamd_redis_async_cbdata_cleanup(struct rspamd_redis_stat_cbdata *cbdata)
-{
- guint i;
- gchar *k;
-
- if (cbdata && !cbdata->wanna_die) {
- /* Avoid double frees */
- cbdata->wanna_die = TRUE;
- redisAsyncFree(cbdata->redis);
-
- for (i = 0; i < cbdata->cur_keys->len; i++) {
- k = g_ptr_array_index(cbdata->cur_keys, i);
- g_free(k);
- }
-
- g_ptr_array_free(cbdata->cur_keys, TRUE);
-
- if (cbdata->elt) {
- cbdata->elt->cbdata = nullptr;
- /* Re-enable parent event */
- cbdata->elt->async->enabled = TRUE;
-
- /* Replace ucl object */
- if (cbdata->cur) {
- if (cbdata->elt->stat) {
- ucl_object_unref(cbdata->elt->stat);
- }
-
- cbdata->elt->stat = cbdata->cur;
- cbdata->cur = nullptr;
- }
- }
-
- if (cbdata->cur) {
- ucl_object_unref(cbdata->cur);
- }
-
- g_free(cbdata);
- }
-}
-
-/* Called when we get number of learns for a specific key */
-static void
-rspamd_redis_stat_learns(redisAsyncContext *c, gpointer r, gpointer priv)
-{
- struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *) priv;
- struct rspamd_redis_stat_cbdata *cbdata;
- redisReply *reply = r;
- ucl_object_t *obj;
- gulong num = 0;
-
- cbdata = redis_elt->cbdata;
-
- if (cbdata == nullptr || cbdata->wanna_die) {
- return;
- }
-
- cbdata->inflight--;
-
- if (c->err == 0 && r != nullptr) {
- if (G_LIKELY(reply->type == REDIS_REPLY_INTEGER)) {
- num = reply->integer;
- }
- else if (reply->type == REDIS_REPLY_STRING) {
- rspamd_strtoul(reply->str, reply->len, &num);
- }
-
- obj = (ucl_object_t *) ucl_object_lookup(cbdata->cur, "revision");
- if (obj) {
- obj->value.iv += num;
- }
- }
-
- if (cbdata->inflight == 0) {
- rspamd_redis_async_cbdata_cleanup(cbdata);
- redis_elt->cbdata = nullptr;
- }
-}
-
-/* Called when we get number of elements for a specific key */
-static void
-rspamd_redis_stat_key(redisAsyncContext *c, gpointer r, gpointer priv)
-{
- struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *) priv;
- struct rspamd_redis_stat_cbdata *cbdata;
- redisReply *reply = r;
- ucl_object_t *obj;
- glong num = 0;
-
- cbdata = redis_elt->cbdata;
-
- if (cbdata == nullptr || cbdata->wanna_die) {
- return;
- }
-
- cbdata->inflight--;
-
- if (c->err == 0 && r != nullptr) {
- if (G_LIKELY(reply->type == REDIS_REPLY_INTEGER)) {
- num = reply->integer;
- }
- else if (reply->type == REDIS_REPLY_STRING) {
- rspamd_strtol(reply->str, reply->len, &num);
- }
-
- if (num < 0) {
- msg_err("bad learns count: %L", (gint64) num);
- num = 0;
- }
-
- obj = (ucl_object_t *) ucl_object_lookup(cbdata->cur, "used");
- if (obj) {
- obj->value.iv += num;
- }
-
- obj = (ucl_object_t *) ucl_object_lookup(cbdata->cur, "total");
- if (obj) {
- obj->value.iv += num;
- }
-
- obj = (ucl_object_t *) ucl_object_lookup(cbdata->cur, "size");
- if (obj) {
- /* Size of key + size of int64_t */
- obj->value.iv += num * (sizeof(G_STRINGIFY(G_MAXINT64)) +
- sizeof(guint64) + sizeof(gpointer));
- }
- }
-
- if (cbdata->inflight == 0) {
- rspamd_redis_async_cbdata_cleanup(cbdata);
- redis_elt->cbdata = nullptr;
- }
-}
-
-/* Called when we have connected to the redis server and got keys to check */
-static void
-rspamd_redis_stat_keys(redisAsyncContext *c, gpointer r, gpointer priv)
-{
- struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *) priv;
- struct rspamd_redis_stat_cbdata *cbdata;
- redisReply *reply = r, *more_elt, *elts, *elt;
- gchar **pk, *k;
- guint i, processed = 0;
- gboolean more = false;
-
- cbdata = redis_elt->cbdata;
-
- if (cbdata == nullptr || cbdata->wanna_die) {
- return;
- }
-
- cbdata->inflight--;
-
- if (c->err == 0 && r != nullptr) {
- if (reply->type == REDIS_REPLY_ARRAY) {
- more_elt = reply->element[0];
- elts = reply->element[1];
-
- if (more_elt != nullptr && more_elt->str != nullptr && strcmp(more_elt->str, "0") != 0) {
- more = true;
- }
-
- /* Clear the existing stuff */
- PTR_ARRAY_FOREACH(cbdata->cur_keys, i, k)
- {
- if (k) {
- g_free(k);
- }
- }
-
- g_ptr_array_set_size(cbdata->cur_keys, elts->elements);
-
- for (i = 0; i < elts->elements; i++) {
- elt = elts->element[i];
-
- if (elt->type == REDIS_REPLY_STRING) {
- pk = (gchar **) &g_ptr_array_index(cbdata->cur_keys, i);
- *pk = g_malloc(elt->len + 1);
- rspamd_strlcpy(*pk, elt->str, elt->len + 1);
- processed++;
- }
- else {
- pk = (gchar **) &g_ptr_array_index(cbdata->cur_keys, i);
- *pk = nullptr;
- }
- }
-
- if (processed) {
- PTR_ARRAY_FOREACH(cbdata->cur_keys, i, k)
- {
- if (k) {
- const gchar *learned_key = "learns";
-
- if (cbdata->elt->ctx->new_schema) {
- if (cbdata->elt->ctx->stcf->is_spam) {
- learned_key = "learns_spam";
- }
- else {
- learned_key = "learns_ham";
- }
- redisAsyncCommand(cbdata->redis,
- rspamd_redis_stat_learns,
- redis_elt,
- "HGET %s %s",
- k, learned_key);
- cbdata->inflight += 1;
- }
- else {
- redisAsyncCommand(cbdata->redis,
- rspamd_redis_stat_key,
- redis_elt,
- "HLEN %s",
- k);
- redisAsyncCommand(cbdata->redis,
- rspamd_redis_stat_learns,
- redis_elt,
- "HGET %s %s",
- k, learned_key);
- cbdata->inflight += 2;
- }
- }
- }
- }
- }
-
- if (more) {
- /* Get more stat keys */
- redisAsyncCommand(cbdata->redis, rspamd_redis_stat_keys, redis_elt,
- "SSCAN %s_keys %s COUNT %d",
- cbdata->elt->ctx->stcf->symbol,
- more_elt->str,
- cbdata->elt->ctx->max_users);
-
- cbdata->inflight += 1;
- }
- else {
- /* Set up the required keys */
- ucl_object_insert_key(cbdata->cur,
- ucl_object_typed_new(UCL_INT), "revision", 0, false);
- ucl_object_insert_key(cbdata->cur,
- ucl_object_typed_new(UCL_INT), "used", 0, false);
- ucl_object_insert_key(cbdata->cur,
- ucl_object_typed_new(UCL_INT), "total", 0, false);
- ucl_object_insert_key(cbdata->cur,
- ucl_object_typed_new(UCL_INT), "size", 0, false);
- ucl_object_insert_key(cbdata->cur,
- ucl_object_fromstring(cbdata->elt->ctx->stcf->symbol),
- "symbol", 0, false);
- ucl_object_insert_key(cbdata->cur, ucl_object_fromstring("redis"),
- "type", 0, false);
- ucl_object_insert_key(cbdata->cur, ucl_object_fromint(0),
- "languages", 0, false);
- ucl_object_insert_key(cbdata->cur, ucl_object_fromint(processed),
- "users", 0, false);
-
- rspamd_upstream_ok(cbdata->selected);
-
- if (cbdata->inflight == 0) {
- rspamd_redis_async_cbdata_cleanup(cbdata);
- redis_elt->cbdata = nullptr;
- }
- }
- }
- else {
- if (c->errstr) {
- msg_err("cannot get keys to gather stat: %s", c->errstr);
- }
- else {
- msg_err("cannot get keys to gather stat: unknown error");
- }
-
- rspamd_upstream_fail(cbdata->selected, FALSE, c->errstr);
- rspamd_redis_async_cbdata_cleanup(cbdata);
- redis_elt->cbdata = nullptr;
- }
-}
-
-static void
-rspamd_redis_async_stat_cb(struct rspamd_stat_async_elt *elt, gpointer d)
-{
- struct redis_stat_ctx *ctx;
- struct rspamd_redis_stat_elt *redis_elt = elt->ud;
- struct rspamd_redis_stat_cbdata *cbdata;
- rspamd_inet_addr_t *addr;
- struct upstream_list *ups;
- redisAsyncContext *redis_ctx;
- struct upstream *selected;
-
- g_assert(redis_elt != nullptr);
-
- ctx = redis_elt->ctx;
-
- if (redis_elt->cbdata) {
- /* We have some other process pending */
- rspamd_redis_async_cbdata_cleanup(redis_elt->cbdata);
- redis_elt->cbdata = nullptr;
- }
-
- /* Disable further events unless needed */
- elt->enabled = FALSE;
-
- ups = rspamd_redis_get_servers(ctx, "read_servers");
-
- if (!ups) {
- return;
- }
-
- selected = rspamd_upstream_get(ups,
- RSPAMD_UPSTREAM_ROUND_ROBIN,
- nullptr,
- 0);
-
- g_assert(selected != nullptr);
- addr = rspamd_upstream_addr_next(selected);
- g_assert(addr != nullptr);
-
- if (rspamd_inet_address_get_af(addr) == AF_UNIX) {
- redis_ctx = redisAsyncConnectUnix(rspamd_inet_address_to_string(addr));
- }
- else {
- redis_ctx = redisAsyncConnect(rspamd_inet_address_to_string(addr),
- rspamd_inet_address_get_port(addr));
- }
-
- if (redis_ctx == nullptr) {
- msg_warn("cannot connect to redis server %s: %s",
- rspamd_inet_address_to_string_pretty(addr),
- strerror(errno));
-
- return;
- }
- else if (redis_ctx->err != REDIS_OK) {
- msg_warn("cannot connect to redis server %s: %s",
- rspamd_inet_address_to_string_pretty(addr),
- redis_ctx->errstr);
- redisAsyncFree(redis_ctx);
-
- return;
- }
-
- redisLibevAttach(redis_elt->event_loop, redis_ctx);
- cbdata = g_malloc0(sizeof(*cbdata));
- cbdata->redis = redis_ctx;
- cbdata->selected = selected;
- cbdata->inflight = 1;
- cbdata->cur = ucl_object_typed_new(UCL_OBJECT);
- cbdata->elt = redis_elt;
- cbdata->cur_keys = g_ptr_array_sized_new(ctx->max_users);
- redis_elt->cbdata = cbdata;
-
- /* XXX: deal with timeouts maybe */
- /* Get keys in redis that match our symbol */
- redisAsyncCommand(cbdata->redis, rspamd_redis_stat_keys, redis_elt,
- "SSCAN %s_keys 0 COUNT %d",
- ctx->stcf->symbol,
- ctx->max_users);
-}
-
-static void
-rspamd_redis_async_stat_fin(struct rspamd_stat_async_elt *elt, gpointer d)
-{
- struct rspamd_redis_stat_elt *redis_elt = elt->ud;
-
- if (redis_elt->cbdata != nullptr) {
- rspamd_redis_async_cbdata_cleanup(redis_elt->cbdata);
- redis_elt->cbdata = nullptr;
- }
-
- /* Clear the static elements */
- if (redis_elt->stat) {
- ucl_object_unref(redis_elt->stat);
- redis_elt->stat = nullptr;
- }
-
- g_free(redis_elt);
-}
-
-#endif
-
static int
rspamd_redis_stat_cb(lua_State *L)
{