diff options
-rw-r--r-- | src/libstat/backends/redis_backend.c | 198 |
1 files changed, 157 insertions, 41 deletions
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c index f51272247..2b0a62f50 100644 --- a/src/libstat/backends/redis_backend.c +++ b/src/libstat/backends/redis_backend.c @@ -45,6 +45,8 @@ struct redis_stat_ctx { gdouble timeout; gboolean enable_users; gboolean store_tokens; + gboolean new_schema; + guint expiry; gint cbref_user; }; @@ -337,22 +339,22 @@ static rspamd_fstring_t * rspamd_redis_tokens_to_query (struct rspamd_task *task, struct redis_stat_runtime *rt, GPtrArray *tokens, - const gchar *arg0, - const gchar *arg1, + const gchar *command, + const gchar *prefix, gboolean learn, gint idx, gboolean intvals) { rspamd_fstring_t *out; rspamd_token_t *tok; - gchar n0[64], n1[64]; - guint i, l0, l1, larg0, larg1; + gchar n0[512], n1[64]; + guint i, l0, l1, cmd_len, prefix_len; gint ret; g_assert (tokens != NULL); - larg0 = strlen (arg0); - larg1 = strlen (arg1); + cmd_len = strlen (command); + prefix_len = strlen (prefix); out = rspamd_fstring_sized_new (1024); if (learn) { @@ -371,46 +373,90 @@ rspamd_redis_tokens_to_query (struct rspamd_task *task, out->len = 0; } else { - rspamd_printf_fstring (&out, "" - "*%d\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n", - (tokens->len + 2), - larg0, arg0, - larg1, arg1); + if (rt->ctx->new_schema) { + /* Multi + HGET */ + rspamd_printf_fstring (&out, "*1\r\n$5\r\nMULTI\r\n"); + + ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL, + out->str, out->len); + + if (ret != REDIS_OK) { + msg_err_task ("call to redis failed: %s", rt->redis->errstr); + rspamd_fstring_free (out); + + return NULL; + } + + out->len = 0; + } + else { + rspamd_printf_fstring (&out, "" + "*%d\r\n" + "$%d\r\n" + "%s\r\n" + "$%d\r\n" + "%s\r\n", + (tokens->len + 2), + cmd_len, command, + prefix_len, prefix); + } } for (i = 0; i < tokens->len; i ++) { tok = g_ptr_array_index (tokens, i); if (learn) { - rspamd_printf_fstring (&out, "" - "*4\r\n" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n", - larg0, arg0, - larg1, arg1); - - l0 = rspamd_snprintf (n0, sizeof (n0), "%uL", tok->data); - if (intvals) { l1 = rspamd_snprintf (n1, sizeof (n1), "%L", - (gint64)tok->values[idx]); - } - else { + (gint64) tok->values[idx]); + } else { l1 = rspamd_snprintf (n1, sizeof (n1), "%f", tok->values[idx]); } - rspamd_printf_fstring (&out, "" - "$%d\r\n" - "%s\r\n" - "$%d\r\n" - "%s\r\n", l0, n0, l1, n1); + if (rt->ctx->new_schema) { + /* + * HINCRBY <prefix_token> <0|1> <value> + */ + l0 = rspamd_snprintf (n0, sizeof (n0), "%*s_%uL", + prefix_len, prefix, + tok->data); + + rspamd_printf_fstring (&out, "" + "*4\r\n" + "$%d\r\n" + "%s\r\n" + "$%d\r\n" + "%s\r\n" + "$%d\r\n" + "%s\r\n" + "$%d\r\n" + "%s\r\n", + cmd_len, command, + l0, n0, + 1, rt->stcf->is_spam ? "1" : "0", + l1, n1); + } + else { + l0 = rspamd_snprintf (n0, sizeof (n0), "%uL", tok->data); + + /* + * HINCRBY <prefix> <token> <value> + */ + rspamd_printf_fstring (&out, "" + "*4\r\n" + "$%d\r\n" + "%s\r\n" + "$%d\r\n" + "%s\r\n" + "$%d\r\n" + "%s\r\n" + "$%d\r\n" + "%s\r\n", + cmd_len, command, + prefix_len, prefix, + l0, n0, l1, n1); + } ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL, out->str, out->len); @@ -422,6 +468,23 @@ rspamd_redis_tokens_to_query (struct rspamd_task *task, return NULL; } + if (rt->ctx->new_schema && rt->ctx->expiry > 0) { + l1 = rspamd_snprintf (n1, sizeof (n1), "%d", + rt->ctx->expiry); + + rspamd_printf_fstring (&out, "" + "*3\r\n" + "$5\r\n" + "EXPIRE\r\n" + "$%d\r\n" + "%s\r\n" + "$%d\r\n" + "%s\r\n", + cmd_len, command, + l0, n0, + l1, n1); + } + if (rt->ctx->store_tokens) { /* * We also store tokens in form @@ -431,7 +494,7 @@ rspamd_redis_tokens_to_query (struct rspamd_task *task, if (tok->t1 && tok->t2) { redisAsyncCommand (rt->redis, NULL, NULL, "HSET %b_tokens %b %b:%b", - arg1, (size_t)larg1, + prefix, (size_t)prefix_len, n0, (size_t)l0, tok->t1->begin, tok->t1->len, tok->t2->begin, tok->t2->len); @@ -439,28 +502,64 @@ rspamd_redis_tokens_to_query (struct rspamd_task *task, else if (tok->t1) { redisAsyncCommand (rt->redis, NULL, NULL, "HSET %b_tokens %b %b", - arg1, (size_t)larg1, + prefix, (size_t)prefix_len, n0, (size_t)l0, tok->t1->begin, tok->t1->len); } redisAsyncCommand (rt->redis, NULL, NULL, "ZINCRBY %b_z %b %b", - arg1, (size_t)larg1, + prefix, (size_t)prefix_len, n1, (size_t)l1, n0, (size_t)l0); } + out->len = 0; } else { - l0 = rspamd_snprintf (n0, sizeof (n0), "%uL", tok->data); - rspamd_printf_fstring (&out, "" - "$%d\r\n" - "%s\r\n", l0, n0); + if (rt->ctx->new_schema) { + l0 = rspamd_snprintf (n0, sizeof (n0), "%*s_%uL", + prefix_len, prefix, + tok->data); + + rspamd_printf_fstring (&out, "" + "*3\r\n" + "$%d\r\n" + "%s\r\n" + "$%d\r\n" + "%s\r\n" + "$%d\r\n" + "%s\r\n", + cmd_len, command, + l0, n0, + 1, rt->stcf->is_spam ? "1" : "0"); + + ret = redisAsyncFormattedCommand (rt->redis, NULL, NULL, + out->str, out->len); + + if (ret != REDIS_OK) { + msg_err_task ("call to redis failed: %s", rt->redis->errstr); + rspamd_fstring_free (out); + + return NULL; + } + + out->len = 0; + } + else { + l0 = rspamd_snprintf (n0, sizeof (n0), "%uL", tok->data); + rspamd_printf_fstring (&out, "" + "$%d\r\n" + "%s\r\n", l0, n0); + } } } + if (!learn && rt->ctx->new_schema) { + rspamd_printf_fstring (&out, "*1\r\n$4\r\nEXEC\r\n"); + } + return out; } @@ -1088,6 +1187,22 @@ rspamd_redis_try_ucl (struct redis_stat_ctx *backend, backend->store_tokens = FALSE; } + elt = ucl_object_lookup (obj, "new_schema"); + if (elt) { + backend->new_schema = ucl_object_toboolean (elt); + } + else { + backend->new_schema = FALSE; + } + + elt = ucl_object_lookup (obj, "expiry"); + if (elt) { + backend->expiry = ucl_object_toint (elt); + } + else { + backend->expiry = 0; + } + elt = ucl_object_lookup_any (obj, "db", "database", "dbname", NULL); if (elt) { backend->dbname = ucl_object_tostring (elt); @@ -1350,6 +1465,7 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, addr = rspamd_upstream_addr (up); g_assert (addr != NULL); + if (rspamd_inet_address_get_af (addr) == AF_UNIX) { rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr)); } |