|
|
@@ -14,24 +14,21 @@ |
|
|
|
* limitations under the License. |
|
|
|
*/ |
|
|
|
#include "config.h" |
|
|
|
#include "lua/lua_common.h" |
|
|
|
#include "rspamd.h" |
|
|
|
#include "stat_internal.h" |
|
|
|
#include "upstream.h" |
|
|
|
#include "lua/lua_common.h" |
|
|
|
#include "libserver/mempool_vars_internal.h" |
|
|
|
#include "hiredis.h" |
|
|
|
#include "adapters/libev.h" |
|
|
|
#include "ref.h" |
|
|
|
|
|
|
|
#define msg_debug_stat_redis(...) rspamd_conditional_debug_fast(NULL, NULL, \ |
|
|
|
#define msg_debug_stat_redis(...) rspamd_conditional_debug_fast(nullptr, nullptr, \ |
|
|
|
rspamd_stat_redis_log_id, "stat_redis", task->task_pool->tag.uid, \ |
|
|
|
RSPAMD_LOG_FUNC, \ |
|
|
|
__VA_ARGS__) |
|
|
|
|
|
|
|
INIT_LOG_MODULE(stat_redis) |
|
|
|
|
|
|
|
#define REDIS_CTX(p) (struct redis_stat_ctx *) (p) |
|
|
|
#define REDIS_RUNTIME(p) (struct redis_stat_runtime *) (p) |
|
|
|
#define REDIS_CTX(p) (reinterpret_cast<struct redis_stat_ctx *>(p)) |
|
|
|
#define REDIS_RUNTIME(p) (reinterpret_cast<struct redis_stat_runtime *>(p)) |
|
|
|
#define REDIS_DEFAULT_OBJECT "%s%l" |
|
|
|
#define REDIS_DEFAULT_USERS_OBJECT "%s%l%r" |
|
|
|
#define REDIS_DEFAULT_TIMEOUT 0.5 |
|
|
@@ -89,7 +86,7 @@ struct rspamd_redis_stat_cbdata { |
|
|
|
gboolean wanna_die; |
|
|
|
}; |
|
|
|
|
|
|
|
#define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt) |
|
|
|
#define GET_TASK_ELT(task, elt) (task == nullptr ? nullptr : (task)->elt) |
|
|
|
|
|
|
|
static const gchar *M = "redis statistics"; |
|
|
|
|
|
|
@@ -132,17 +129,17 @@ gsize rspamd_redis_expand_object(const gchar *pattern, |
|
|
|
mod_char |
|
|
|
} state = just_char; |
|
|
|
struct rspamd_statfile_config *stcf; |
|
|
|
lua_State *L = NULL; |
|
|
|
lua_State *L = nullptr; |
|
|
|
struct rspamd_task **ptask; |
|
|
|
const gchar *rcpt = NULL; |
|
|
|
const gchar *rcpt = nullptr; |
|
|
|
gint err_idx; |
|
|
|
|
|
|
|
g_assert(ctx != NULL); |
|
|
|
g_assert(task != NULL); |
|
|
|
g_assert(ctx != nullptr); |
|
|
|
g_assert(task != nullptr); |
|
|
|
stcf = ctx->stcf; |
|
|
|
|
|
|
|
L = task->cfg->lua_state; |
|
|
|
g_assert(L != NULL); |
|
|
|
L = RSPAMD_LUA_CFG_STATE(task->cfg); |
|
|
|
g_assert(L != nullptr); |
|
|
|
|
|
|
|
if (ctx->enable_users) { |
|
|
|
if (ctx->cbref_user == -1) { |
|
|
@@ -154,7 +151,7 @@ gsize rspamd_redis_expand_object(const gchar *pattern, |
|
|
|
err_idx = lua_gettop(L); |
|
|
|
|
|
|
|
lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->cbref_user); |
|
|
|
ptask = lua_newuserdata(L, sizeof(struct rspamd_task *)); |
|
|
|
ptask = (struct rspamd_task **) lua_newuserdata(L, sizeof(struct rspamd_task *)); |
|
|
|
*ptask = task; |
|
|
|
rspamd_lua_setclass(L, "rspamd{task}", -1); |
|
|
|
|
|
|
@@ -172,7 +169,7 @@ gsize rspamd_redis_expand_object(const gchar *pattern, |
|
|
|
|
|
|
|
if (rcpt) { |
|
|
|
rspamd_mempool_set_variable(task->task_pool, "stat_user", |
|
|
|
(gpointer) rcpt, NULL); |
|
|
|
(gpointer) rcpt, nullptr); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -202,7 +199,7 @@ gsize rspamd_redis_expand_object(const gchar *pattern, |
|
|
|
break; |
|
|
|
case 'r': |
|
|
|
|
|
|
|
if (rcpt == NULL) { |
|
|
|
if (rcpt == nullptr) { |
|
|
|
elt = rspamd_task_get_principal_recipient(task); |
|
|
|
} |
|
|
|
else { |
|
|
@@ -256,11 +253,11 @@ gsize rspamd_redis_expand_object(const gchar *pattern, |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (target == NULL) { |
|
|
|
if (target == nullptr) { |
|
|
|
return -1; |
|
|
|
} |
|
|
|
|
|
|
|
*target = rspamd_mempool_alloc(task->task_pool, tlen + 1); |
|
|
|
*target = (gchar *) rspamd_mempool_alloc(task->task_pool, tlen + 1); |
|
|
|
d = *target; |
|
|
|
end = d + tlen + 1; |
|
|
|
d[tlen] = '\0'; |
|
|
@@ -292,7 +289,7 @@ gsize rspamd_redis_expand_object(const gchar *pattern, |
|
|
|
} |
|
|
|
break; |
|
|
|
case 'r': |
|
|
|
if (rcpt == NULL) { |
|
|
|
if (rcpt == nullptr) { |
|
|
|
elt = rspamd_task_get_principal_recipient(task); |
|
|
|
} |
|
|
|
else { |
|
|
@@ -367,7 +364,7 @@ rspamd_redis_tokens_to_query(struct rspamd_task *task, |
|
|
|
guint i, l0, l1, cmd_len, prefix_len; |
|
|
|
gint ret; |
|
|
|
|
|
|
|
g_assert(tokens != NULL); |
|
|
|
g_assert(tokens != nullptr); |
|
|
|
|
|
|
|
cmd_len = strlen(command); |
|
|
|
prefix_len = strlen(prefix); |
|
|
@@ -376,14 +373,14 @@ rspamd_redis_tokens_to_query(struct rspamd_task *task, |
|
|
|
if (learn) { |
|
|
|
rspamd_printf_fstring(&out, "*1\r\n$5\r\nMULTI\r\n"); |
|
|
|
|
|
|
|
ret = redisAsyncFormattedCommand(rt->redis, NULL, NULL, |
|
|
|
ret = redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, |
|
|
|
out->str, out->len); |
|
|
|
|
|
|
|
if (ret != REDIS_OK) { |
|
|
|
msg_err_task("call to redis failed: %s", rt->redis->errstr); |
|
|
|
rspamd_fstring_free(out); |
|
|
|
|
|
|
|
return NULL; |
|
|
|
return nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
out->len = 0; |
|
|
@@ -393,14 +390,14 @@ rspamd_redis_tokens_to_query(struct rspamd_task *task, |
|
|
|
/* Multi + HGET */ |
|
|
|
rspamd_printf_fstring(&out, "*1\r\n$5\r\nMULTI\r\n"); |
|
|
|
|
|
|
|
ret = redisAsyncFormattedCommand(rt->redis, NULL, NULL, |
|
|
|
ret = redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, |
|
|
|
out->str, out->len); |
|
|
|
|
|
|
|
if (ret != REDIS_OK) { |
|
|
|
msg_err_task("call to redis failed: %s", rt->redis->errstr); |
|
|
|
rspamd_fstring_free(out); |
|
|
|
|
|
|
|
return NULL; |
|
|
|
return nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
out->len = 0; |
|
|
@@ -476,14 +473,14 @@ rspamd_redis_tokens_to_query(struct rspamd_task *task, |
|
|
|
l1, n1); |
|
|
|
} |
|
|
|
|
|
|
|
ret = redisAsyncFormattedCommand(rt->redis, NULL, NULL, |
|
|
|
ret = redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, |
|
|
|
out->str, out->len); |
|
|
|
|
|
|
|
if (ret != REDIS_OK) { |
|
|
|
msg_err_task("call to redis failed: %s", rt->redis->errstr); |
|
|
|
rspamd_fstring_free(out); |
|
|
|
|
|
|
|
return NULL; |
|
|
|
return nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
if (rt->ctx->store_tokens) { |
|
|
@@ -495,7 +492,7 @@ rspamd_redis_tokens_to_query(struct rspamd_task *task, |
|
|
|
* ZINCRBY prefix_z 1.0 <token_id> |
|
|
|
*/ |
|
|
|
if (tok->t1 && tok->t2) { |
|
|
|
redisAsyncCommand(rt->redis, NULL, NULL, |
|
|
|
redisAsyncCommand(rt->redis, nullptr, nullptr, |
|
|
|
"HSET %b_tokens %b %b:%b", |
|
|
|
prefix, (size_t) prefix_len, |
|
|
|
n0, (size_t) l0, |
|
|
@@ -503,7 +500,7 @@ rspamd_redis_tokens_to_query(struct rspamd_task *task, |
|
|
|
tok->t2->stemmed.begin, tok->t2->stemmed.len); |
|
|
|
} |
|
|
|
else if (tok->t1) { |
|
|
|
redisAsyncCommand(rt->redis, NULL, NULL, |
|
|
|
redisAsyncCommand(rt->redis, nullptr, nullptr, |
|
|
|
"HSET %b_tokens %b %b", |
|
|
|
prefix, (size_t) prefix_len, |
|
|
|
n0, (size_t) l0, |
|
|
@@ -518,7 +515,7 @@ rspamd_redis_tokens_to_query(struct rspamd_task *task, |
|
|
|
* ZINCRBY prefix_z 1.0 <token_id> |
|
|
|
*/ |
|
|
|
if (tok->t1 && tok->t2) { |
|
|
|
redisAsyncCommand(rt->redis, NULL, NULL, |
|
|
|
redisAsyncCommand(rt->redis, nullptr, nullptr, |
|
|
|
"HSET %b %s %b:%b", |
|
|
|
n0, (size_t) l0, |
|
|
|
"tokens", |
|
|
@@ -526,7 +523,7 @@ rspamd_redis_tokens_to_query(struct rspamd_task *task, |
|
|
|
tok->t2->stemmed.begin, tok->t2->stemmed.len); |
|
|
|
} |
|
|
|
else if (tok->t1) { |
|
|
|
redisAsyncCommand(rt->redis, NULL, NULL, |
|
|
|
redisAsyncCommand(rt->redis, nullptr, nullptr, |
|
|
|
"HSET %b %s %b", |
|
|
|
n0, (size_t) l0, |
|
|
|
"tokens", |
|
|
@@ -534,7 +531,7 @@ rspamd_redis_tokens_to_query(struct rspamd_task *task, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
redisAsyncCommand(rt->redis, NULL, NULL, |
|
|
|
redisAsyncCommand(rt->redis, nullptr, nullptr, |
|
|
|
"ZINCRBY %b_z %b %b", |
|
|
|
prefix, (size_t) prefix_len, |
|
|
|
n1, (size_t) l1, |
|
|
@@ -556,7 +553,7 @@ rspamd_redis_tokens_to_query(struct rspamd_task *task, |
|
|
|
"%s\r\n", |
|
|
|
l0, n0, |
|
|
|
l1, n1); |
|
|
|
redisAsyncFormattedCommand(rt->redis, NULL, NULL, |
|
|
|
redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, |
|
|
|
out->str, out->len); |
|
|
|
} |
|
|
|
|
|
|
@@ -580,14 +577,14 @@ rspamd_redis_tokens_to_query(struct rspamd_task *task, |
|
|
|
l0, n0, |
|
|
|
1, rt->stcf->is_spam ? "S" : "H"); |
|
|
|
|
|
|
|
ret = redisAsyncFormattedCommand(rt->redis, NULL, NULL, |
|
|
|
ret = redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, |
|
|
|
out->str, out->len); |
|
|
|
|
|
|
|
if (ret != REDIS_OK) { |
|
|
|
msg_err_task("call to redis failed: %s", rt->redis->errstr); |
|
|
|
rspamd_fstring_free(out); |
|
|
|
|
|
|
|
return NULL; |
|
|
|
return nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
out->len = 0; |
|
|
@@ -624,7 +621,7 @@ rspamd_redis_store_stat_signature(struct rspamd_task *task, |
|
|
|
sig = rspamd_mempool_get_variable(task->task_pool, |
|
|
|
RSPAMD_MEMPOOL_STAT_SIGNATURE); |
|
|
|
|
|
|
|
if (sig == NULL) { |
|
|
|
if (sig == nullptr) { |
|
|
|
msg_err_task("cannot get bayes signature"); |
|
|
|
return; |
|
|
|
} |
|
|
@@ -641,7 +638,7 @@ rspamd_redis_store_stat_signature(struct rspamd_task *task, |
|
|
|
"$%d\r\n" |
|
|
|
"%s\r\n", |
|
|
|
klen, keybuf); |
|
|
|
redisAsyncFormattedCommand(rt->redis, NULL, NULL, |
|
|
|
redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, |
|
|
|
out->str, out->len); |
|
|
|
out->len = 0; |
|
|
|
|
|
|
@@ -663,7 +660,7 @@ rspamd_redis_store_stat_signature(struct rspamd_task *task, |
|
|
|
blen, nbuf); |
|
|
|
} |
|
|
|
|
|
|
|
redisAsyncFormattedCommand(rt->redis, NULL, NULL, |
|
|
|
redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, |
|
|
|
out->str, out->len); |
|
|
|
out->len = 0; |
|
|
|
|
|
|
@@ -682,15 +679,13 @@ rspamd_redis_store_stat_signature(struct rspamd_task *task, |
|
|
|
"%s\r\n", |
|
|
|
klen, keybuf, |
|
|
|
blen, nbuf); |
|
|
|
redisAsyncFormattedCommand(rt->redis, NULL, NULL, |
|
|
|
redisAsyncFormattedCommand(rt->redis, nullptr, nullptr, |
|
|
|
out->str, out->len); |
|
|
|
} |
|
|
|
|
|
|
|
rspamd_fstring_free(out); |
|
|
|
} |
|
|
|
|
|
|
|
#endif |
|
|
|
|
|
|
|
static void |
|
|
|
rspamd_redis_async_cbdata_cleanup(struct rspamd_redis_stat_cbdata *cbdata) |
|
|
|
{ |
|
|
@@ -710,7 +705,7 @@ rspamd_redis_async_cbdata_cleanup(struct rspamd_redis_stat_cbdata *cbdata) |
|
|
|
g_ptr_array_free(cbdata->cur_keys, TRUE); |
|
|
|
|
|
|
|
if (cbdata->elt) { |
|
|
|
cbdata->elt->cbdata = NULL; |
|
|
|
cbdata->elt->cbdata = nullptr; |
|
|
|
/* Re-enable parent event */ |
|
|
|
cbdata->elt->async->enabled = TRUE; |
|
|
|
|
|
|
@@ -721,7 +716,7 @@ rspamd_redis_async_cbdata_cleanup(struct rspamd_redis_stat_cbdata *cbdata) |
|
|
|
} |
|
|
|
|
|
|
|
cbdata->elt->stat = cbdata->cur; |
|
|
|
cbdata->cur = NULL; |
|
|
|
cbdata->cur = nullptr; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -745,13 +740,13 @@ rspamd_redis_stat_learns(redisAsyncContext *c, gpointer r, gpointer priv) |
|
|
|
|
|
|
|
cbdata = redis_elt->cbdata; |
|
|
|
|
|
|
|
if (cbdata == NULL || cbdata->wanna_die) { |
|
|
|
if (cbdata == nullptr || cbdata->wanna_die) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
cbdata->inflight--; |
|
|
|
|
|
|
|
if (c->err == 0 && r != NULL) { |
|
|
|
if (c->err == 0 && r != nullptr) { |
|
|
|
if (G_LIKELY(reply->type == REDIS_REPLY_INTEGER)) { |
|
|
|
num = reply->integer; |
|
|
|
} |
|
|
@@ -767,7 +762,7 @@ rspamd_redis_stat_learns(redisAsyncContext *c, gpointer r, gpointer priv) |
|
|
|
|
|
|
|
if (cbdata->inflight == 0) { |
|
|
|
rspamd_redis_async_cbdata_cleanup(cbdata); |
|
|
|
redis_elt->cbdata = NULL; |
|
|
|
redis_elt->cbdata = nullptr; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -783,13 +778,13 @@ rspamd_redis_stat_key(redisAsyncContext *c, gpointer r, gpointer priv) |
|
|
|
|
|
|
|
cbdata = redis_elt->cbdata; |
|
|
|
|
|
|
|
if (cbdata == NULL || cbdata->wanna_die) { |
|
|
|
if (cbdata == nullptr || cbdata->wanna_die) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
cbdata->inflight--; |
|
|
|
|
|
|
|
if (c->err == 0 && r != NULL) { |
|
|
|
if (c->err == 0 && r != nullptr) { |
|
|
|
if (G_LIKELY(reply->type == REDIS_REPLY_INTEGER)) { |
|
|
|
num = reply->integer; |
|
|
|
} |
|
|
@@ -822,7 +817,7 @@ rspamd_redis_stat_key(redisAsyncContext *c, gpointer r, gpointer priv) |
|
|
|
|
|
|
|
if (cbdata->inflight == 0) { |
|
|
|
rspamd_redis_async_cbdata_cleanup(cbdata); |
|
|
|
redis_elt->cbdata = NULL; |
|
|
|
redis_elt->cbdata = nullptr; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -839,18 +834,18 @@ rspamd_redis_stat_keys(redisAsyncContext *c, gpointer r, gpointer priv) |
|
|
|
|
|
|
|
cbdata = redis_elt->cbdata; |
|
|
|
|
|
|
|
if (cbdata == NULL || cbdata->wanna_die) { |
|
|
|
if (cbdata == nullptr || cbdata->wanna_die) { |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
cbdata->inflight--; |
|
|
|
|
|
|
|
if (c->err == 0 && r != NULL) { |
|
|
|
if (c->err == 0 && r != nullptr) { |
|
|
|
if (reply->type == REDIS_REPLY_ARRAY) { |
|
|
|
more_elt = reply->element[0]; |
|
|
|
elts = reply->element[1]; |
|
|
|
|
|
|
|
if (more_elt != NULL && more_elt->str != NULL && strcmp(more_elt->str, "0") != 0) { |
|
|
|
if (more_elt != nullptr && more_elt->str != nullptr && strcmp(more_elt->str, "0") != 0) { |
|
|
|
more = true; |
|
|
|
} |
|
|
|
|
|
|
@@ -875,7 +870,7 @@ rspamd_redis_stat_keys(redisAsyncContext *c, gpointer r, gpointer priv) |
|
|
|
} |
|
|
|
else { |
|
|
|
pk = (gchar **) &g_ptr_array_index(cbdata->cur_keys, i); |
|
|
|
*pk = NULL; |
|
|
|
*pk = nullptr; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -951,7 +946,7 @@ rspamd_redis_stat_keys(redisAsyncContext *c, gpointer r, gpointer priv) |
|
|
|
|
|
|
|
if (cbdata->inflight == 0) { |
|
|
|
rspamd_redis_async_cbdata_cleanup(cbdata); |
|
|
|
redis_elt->cbdata = NULL; |
|
|
|
redis_elt->cbdata = nullptr; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@@ -965,7 +960,7 @@ rspamd_redis_stat_keys(redisAsyncContext *c, gpointer r, gpointer priv) |
|
|
|
|
|
|
|
rspamd_upstream_fail(cbdata->selected, FALSE, c->errstr); |
|
|
|
rspamd_redis_async_cbdata_cleanup(cbdata); |
|
|
|
redis_elt->cbdata = NULL; |
|
|
|
redis_elt->cbdata = nullptr; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -980,14 +975,14 @@ rspamd_redis_async_stat_cb(struct rspamd_stat_async_elt *elt, gpointer d) |
|
|
|
redisAsyncContext *redis_ctx; |
|
|
|
struct upstream *selected; |
|
|
|
|
|
|
|
g_assert(redis_elt != NULL); |
|
|
|
g_assert(redis_elt != nullptr); |
|
|
|
|
|
|
|
ctx = redis_elt->ctx; |
|
|
|
|
|
|
|
if (redis_elt->cbdata) { |
|
|
|
/* We have some other process pending */ |
|
|
|
rspamd_redis_async_cbdata_cleanup(redis_elt->cbdata); |
|
|
|
redis_elt->cbdata = NULL; |
|
|
|
redis_elt->cbdata = nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
/* Disable further events unless needed */ |
|
|
@@ -1001,12 +996,12 @@ rspamd_redis_async_stat_cb(struct rspamd_stat_async_elt *elt, gpointer d) |
|
|
|
|
|
|
|
selected = rspamd_upstream_get(ups, |
|
|
|
RSPAMD_UPSTREAM_ROUND_ROBIN, |
|
|
|
NULL, |
|
|
|
nullptr, |
|
|
|
0); |
|
|
|
|
|
|
|
g_assert(selected != NULL); |
|
|
|
g_assert(selected != nullptr); |
|
|
|
addr = rspamd_upstream_addr_next(selected); |
|
|
|
g_assert(addr != NULL); |
|
|
|
g_assert(addr != nullptr); |
|
|
|
|
|
|
|
if (rspamd_inet_address_get_af(addr) == AF_UNIX) { |
|
|
|
redis_ctx = redisAsyncConnectUnix(rspamd_inet_address_to_string(addr)); |
|
|
@@ -1016,7 +1011,7 @@ rspamd_redis_async_stat_cb(struct rspamd_stat_async_elt *elt, gpointer d) |
|
|
|
rspamd_inet_address_get_port(addr)); |
|
|
|
} |
|
|
|
|
|
|
|
if (redis_ctx == NULL) { |
|
|
|
if (redis_ctx == nullptr) { |
|
|
|
msg_warn("cannot connect to redis server %s: %s", |
|
|
|
rspamd_inet_address_to_string_pretty(addr), |
|
|
|
strerror(errno)); |
|
|
@@ -1055,20 +1050,23 @@ rspamd_redis_async_stat_fin(struct rspamd_stat_async_elt *elt, gpointer d) |
|
|
|
{ |
|
|
|
struct rspamd_redis_stat_elt *redis_elt = elt->ud; |
|
|
|
|
|
|
|
if (redis_elt->cbdata != NULL) { |
|
|
|
if (redis_elt->cbdata != nullptr) { |
|
|
|
rspamd_redis_async_cbdata_cleanup(redis_elt->cbdata); |
|
|
|
redis_elt->cbdata = NULL; |
|
|
|
redis_elt->cbdata = nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
/* Clear the static elements */ |
|
|
|
if (redis_elt->stat) { |
|
|
|
ucl_object_unref(redis_elt->stat); |
|
|
|
redis_elt->stat = NULL; |
|
|
|
redis_elt->stat = nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
g_free(redis_elt); |
|
|
|
} |
|
|
|
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
|
|
/* Called on connection termination */ |
|
|
|
static void |
|
|
|
rspamd_redis_fin(gpointer data) |
|
|
@@ -1093,11 +1091,12 @@ rspamd_redis_parse_classifier_opts(struct redis_stat_ctx *backend, |
|
|
|
{ |
|
|
|
const gchar *lua_script; |
|
|
|
const ucl_object_t *elt, *users_enabled; |
|
|
|
auto *L = RSPAMD_LUA_CFG_STATE(cfg); |
|
|
|
|
|
|
|
users_enabled = ucl_object_lookup_any(classifier_obj, "per_user", |
|
|
|
"users_enabled", NULL); |
|
|
|
"users_enabled", nullptr); |
|
|
|
|
|
|
|
if (users_enabled != NULL) { |
|
|
|
if (users_enabled != nullptr) { |
|
|
|
if (ucl_object_type(users_enabled) == UCL_BOOLEAN) { |
|
|
|
backend->enable_users = ucl_object_toboolean(users_enabled); |
|
|
|
backend->cbref_user = -1; |
|
|
@@ -1105,22 +1104,21 @@ rspamd_redis_parse_classifier_opts(struct redis_stat_ctx *backend, |
|
|
|
else if (ucl_object_type(users_enabled) == UCL_STRING) { |
|
|
|
lua_script = ucl_object_tostring(users_enabled); |
|
|
|
|
|
|
|
if (luaL_dostring(cfg->lua_state, lua_script) != 0) { |
|
|
|
if (luaL_dostring(L, lua_script) != 0) { |
|
|
|
msg_err_config("cannot execute lua script for users " |
|
|
|
"extraction: %s", |
|
|
|
lua_tostring(cfg->lua_state, -1)); |
|
|
|
lua_tostring(L, -1)); |
|
|
|
} |
|
|
|
else { |
|
|
|
if (lua_type(cfg->lua_state, -1) == LUA_TFUNCTION) { |
|
|
|
if (lua_type(L, -1) == LUA_TFUNCTION) { |
|
|
|
backend->enable_users = TRUE; |
|
|
|
backend->cbref_user = luaL_ref(cfg->lua_state, |
|
|
|
backend->cbref_user = luaL_ref(L, |
|
|
|
LUA_REGISTRYINDEX); |
|
|
|
} |
|
|
|
else { |
|
|
|
msg_err_config("lua script must return " |
|
|
|
"function(task) and not %s", |
|
|
|
lua_typename(cfg->lua_state, lua_type( |
|
|
|
cfg->lua_state, -1))); |
|
|
|
lua_typename(L, lua_type(L, -1))); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@@ -1131,7 +1129,7 @@ rspamd_redis_parse_classifier_opts(struct redis_stat_ctx *backend, |
|
|
|
} |
|
|
|
|
|
|
|
elt = ucl_object_lookup(classifier_obj, "prefix"); |
|
|
|
if (elt == NULL || ucl_object_type(elt) != UCL_STRING) { |
|
|
|
if (elt == nullptr || ucl_object_type(elt) != UCL_STRING) { |
|
|
|
/* Default non-users statistics */ |
|
|
|
if (backend->enable_users || backend->cbref_user != -1) { |
|
|
|
backend->redis_object = REDIS_DEFAULT_USERS_OBJECT; |
|
|
@@ -1161,7 +1159,7 @@ rspamd_redis_parse_classifier_opts(struct redis_stat_ctx *backend, |
|
|
|
backend->enable_signatures = FALSE; |
|
|
|
} |
|
|
|
|
|
|
|
elt = ucl_object_lookup_any(classifier_obj, "expiry", "expire", NULL); |
|
|
|
elt = ucl_object_lookup_any(classifier_obj, "expiry", "expire", nullptr); |
|
|
|
if (elt) { |
|
|
|
backend->expiry = ucl_object_toint(elt); |
|
|
|
} |
|
|
@@ -1177,9 +1175,8 @@ rspamd_redis_parse_classifier_opts(struct redis_stat_ctx *backend, |
|
|
|
backend->max_users = REDIS_MAX_USERS; |
|
|
|
} |
|
|
|
|
|
|
|
lua_State *L = RSPAMD_LUA_CFG_STATE(cfg); |
|
|
|
lua_pushcfunction(L, &rspamd_lua_traceback); |
|
|
|
int err_idx = lua_gettop(L); |
|
|
|
auto err_idx = lua_gettop(L); |
|
|
|
|
|
|
|
/* Obtain function */ |
|
|
|
if (!rspamd_lua_require_function(L, "lua_bayes_redis", "lua_bayes_init_classifier")) { |
|
|
@@ -1199,7 +1196,7 @@ rspamd_redis_parse_classifier_opts(struct redis_stat_ctx *backend, |
|
|
|
lua_tostring(L, -1)); |
|
|
|
lua_settop(L, err_idx - 1); |
|
|
|
|
|
|
|
return NULL; |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
/* Results are in the stack: |
|
|
@@ -1222,15 +1219,10 @@ gpointer |
|
|
|
rspamd_redis_init(struct rspamd_stat_ctx *ctx, |
|
|
|
struct rspamd_config *cfg, struct rspamd_statfile *st) |
|
|
|
{ |
|
|
|
struct redis_stat_ctx *backend; |
|
|
|
struct rspamd_statfile_config *stf = st->stcf; |
|
|
|
struct rspamd_redis_stat_elt *st_elt; |
|
|
|
const ucl_object_t *obj; |
|
|
|
gboolean ret = FALSE; |
|
|
|
gint conf_ref = -1; |
|
|
|
lua_State *L = (lua_State *) cfg->lua_state; |
|
|
|
auto *L = (lua_State *) cfg->lua_state; |
|
|
|
|
|
|
|
backend = g_malloc0(sizeof(*backend)); |
|
|
|
auto *backend = g_new0(struct redis_stat_ctx, 1); |
|
|
|
backend->L = L; |
|
|
|
backend->max_users = REDIS_MAX_USERS; |
|
|
|
|
|
|
@@ -1239,15 +1231,15 @@ rspamd_redis_init(struct rspamd_stat_ctx *ctx, |
|
|
|
lua_settop(L, 0); |
|
|
|
|
|
|
|
if (!rspamd_redis_parse_classifier_opts(backend, st->stcf->opts, st->classifier->cfg->opts, cfg)) { |
|
|
|
msg_err_config("cannot init redis backend for %s", stf->symbol); |
|
|
|
msg_err_config("cannot init redis backend for %s", st->stcf->symbol); |
|
|
|
g_free(backend); |
|
|
|
return NULL; |
|
|
|
return nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
stf->clcf->flags |= RSPAMD_FLAG_CLASSIFIER_INCREMENTING_BACKEND; |
|
|
|
backend->stcf = stf; |
|
|
|
st->stcf->clcf->flags |= RSPAMD_FLAG_CLASSIFIER_INCREMENTING_BACKEND; |
|
|
|
backend->stcf = st->stcf; |
|
|
|
|
|
|
|
st_elt = g_malloc0(sizeof(*st_elt)); |
|
|
|
auto *st_elt = g_new0(struct rspamd_redis_stat_elt, 1); |
|
|
|
st_elt->event_loop = ctx->event_loop; |
|
|
|
st_elt->ctx = backend; |
|
|
|
#if 0 |
|
|
@@ -1269,10 +1261,10 @@ rspamd_redis_runtime(struct rspamd_task *task, |
|
|
|
{ |
|
|
|
struct redis_stat_ctx *ctx = REDIS_CTX(c); |
|
|
|
struct redis_stat_runtime *rt; |
|
|
|
char *object_expanded = NULL; |
|
|
|
char *object_expanded = nullptr; |
|
|
|
|
|
|
|
g_assert(ctx != NULL); |
|
|
|
g_assert(stcf != NULL); |
|
|
|
g_assert(ctx != nullptr); |
|
|
|
g_assert(stcf != nullptr); |
|
|
|
|
|
|
|
if (rspamd_redis_expand_object(ctx->redis_object, ctx, task, |
|
|
|
&object_expanded) == 0) { |
|
|
@@ -1280,13 +1272,13 @@ rspamd_redis_runtime(struct rspamd_task *task, |
|
|
|
"(maybe learning per user classifier with no user or recipient)", |
|
|
|
learn ? "learning" : "classifying", |
|
|
|
stcf->symbol); |
|
|
|
return NULL; |
|
|
|
return nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
/* Look for the cached results */ |
|
|
|
|
|
|
|
|
|
|
|
rt = rspamd_mempool_alloc0(task->task_pool, sizeof(*rt)); |
|
|
|
rt = (struct redis_stat_runtime *) rspamd_mempool_alloc0(task->task_pool, sizeof(*rt)); |
|
|
|
rt->task = task; |
|
|
|
rt->ctx = ctx; |
|
|
|
rt->redis_object_expanded = object_expanded; |
|
|
@@ -1328,7 +1320,7 @@ rspamd_redis_serialize_tokens(struct rspamd_task *task, GPtrArray *tokens, gsize |
|
|
|
gchar *buf, *p; |
|
|
|
rspamd_token_t *tok; |
|
|
|
|
|
|
|
buf = rspamd_mempool_alloc(task->task_pool, req_len); |
|
|
|
buf = (gchar *) rspamd_mempool_alloc(task->task_pool, req_len); |
|
|
|
p = buf; |
|
|
|
|
|
|
|
/* Array */ |
|
|
@@ -1361,7 +1353,7 @@ rspamd_redis_classified(lua_State *L) |
|
|
|
struct redis_stat_runtime *rt = REDIS_RUNTIME(rspamd_mempool_get_variable(task->task_pool, cookie)); |
|
|
|
/* TODO: write it */ |
|
|
|
|
|
|
|
if (rt == NULL) { |
|
|
|
if (rt == nullptr) { |
|
|
|
msg_err_task("internal error: cannot find runtime for cookie %s", cookie); |
|
|
|
|
|
|
|
return 0; |
|
|
@@ -1389,7 +1381,7 @@ rspamd_redis_process_tokens(struct rspamd_task *task, |
|
|
|
return FALSE; |
|
|
|
} |
|
|
|
|
|
|
|
if (tokens == NULL || tokens->len == 0) { |
|
|
|
if (tokens == nullptr || tokens->len == 0) { |
|
|
|
return FALSE; |
|
|
|
} |
|
|
|
|
|
|
@@ -1412,10 +1404,10 @@ rspamd_redis_process_tokens(struct rspamd_task *task, |
|
|
|
lua_new_text(L, tokens_buf, tokens_len, false); |
|
|
|
|
|
|
|
/* Store rt in random cookie */ |
|
|
|
gchar *cookie = rspamd_mempool_alloc(task->task_pool, 16); |
|
|
|
char *cookie = (char *) rspamd_mempool_alloc(task->task_pool, 16); |
|
|
|
rspamd_random_hex(cookie, 16); |
|
|
|
cookie[15] = '\0'; |
|
|
|
rspamd_mempool_set_variable(task->task_pool, cookie, rt, NULL); |
|
|
|
rspamd_mempool_set_variable(task->task_pool, cookie, rt, nullptr); |
|
|
|
/* Callback */ |
|
|
|
lua_pushstring(L, cookie); |
|
|
|
lua_pushcclosure(L, &rspamd_redis_classified, 1); |
|
|
@@ -1442,7 +1434,7 @@ rspamd_redis_finalize_process(struct rspamd_task *task, gpointer runtime, |
|
|
|
if (rt->err) { |
|
|
|
msg_info_task("cannot retrieve stat tokens from Redis: %e", rt->err); |
|
|
|
g_error_free(rt->err); |
|
|
|
rt->err = NULL; |
|
|
|
rt->err = nullptr; |
|
|
|
rspamd_redis_fin(rt); |
|
|
|
|
|
|
|
return FALSE; |
|
|
@@ -1458,7 +1450,6 @@ rspamd_redis_learn_tokens(struct rspamd_task *task, GPtrArray *tokens, |
|
|
|
gint id, gpointer p) |
|
|
|
{ |
|
|
|
struct redis_stat_runtime *rt = REDIS_RUNTIME(p); |
|
|
|
lua_State *L = rt->ctx->L; |
|
|
|
|
|
|
|
/* TODO: write learn function */ |
|
|
|
|
|
|
@@ -1474,7 +1465,7 @@ rspamd_redis_finalize_learn(struct rspamd_task *task, gpointer runtime, |
|
|
|
|
|
|
|
if (rt->err) { |
|
|
|
g_propagate_error(err, rt->err); |
|
|
|
rt->err = NULL; |
|
|
|
rt->err = nullptr; |
|
|
|
rspamd_redis_fin(rt); |
|
|
|
|
|
|
|
return FALSE; |
|
|
@@ -1532,19 +1523,19 @@ rspamd_redis_get_stat(gpointer runtime, |
|
|
|
redisAsyncContext *redis; |
|
|
|
|
|
|
|
if (rt->ctx->stat_elt) { |
|
|
|
st = rt->ctx->stat_elt->ud; |
|
|
|
st = (struct rspamd_redis_stat_elt *) rt->ctx->stat_elt->ud; |
|
|
|
|
|
|
|
if (st->stat) { |
|
|
|
return ucl_object_ref(st->stat); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return NULL; |
|
|
|
return nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
gpointer |
|
|
|
rspamd_redis_load_tokenizer_config(gpointer runtime, |
|
|
|
gsize *len) |
|
|
|
{ |
|
|
|
return NULL; |
|
|
|
return nullptr; |
|
|
|
} |