|
|
@@ -24,6 +24,7 @@ |
|
|
|
#include "upstream.h" |
|
|
|
#include "contrib/hiredis/hiredis.h" |
|
|
|
#include "contrib/hiredis/async.h" |
|
|
|
#include "lua/lua_common.h" |
|
|
|
|
|
|
|
#define REDIS_DEFAULT_PORT 6379 |
|
|
|
#define REDIS_DEFAULT_OBJECT "fuzzy" |
|
|
@@ -49,14 +50,14 @@ |
|
|
|
INIT_LOG_MODULE(fuzzy_redis) |
|
|
|
|
|
|
|
struct rspamd_fuzzy_backend_redis { |
|
|
|
struct upstream_list *read_servers; |
|
|
|
struct upstream_list *write_servers; |
|
|
|
lua_State *L; |
|
|
|
const gchar *redis_object; |
|
|
|
const gchar *password; |
|
|
|
const gchar *dbname; |
|
|
|
gchar *id; |
|
|
|
struct rspamd_redis_pool *pool; |
|
|
|
gdouble timeout; |
|
|
|
gint conf_ref; |
|
|
|
ref_entry_t ref; |
|
|
|
}; |
|
|
|
|
|
|
@@ -98,6 +99,22 @@ struct rspamd_fuzzy_redis_session { |
|
|
|
guchar found_digest[rspamd_cryptobox_HASHBYTES]; |
|
|
|
}; |
|
|
|
|
|
|
|
static inline struct upstream_list * |
|
|
|
rspamd_redis_get_servers (struct rspamd_fuzzy_backend_redis *ctx, |
|
|
|
const gchar *what) |
|
|
|
{ |
|
|
|
lua_State *L = ctx->L; |
|
|
|
struct upstream_list *res; |
|
|
|
|
|
|
|
lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref); |
|
|
|
lua_pushstring (L, what); |
|
|
|
lua_gettable (L, -2); |
|
|
|
res = *((struct upstream_list**)lua_touserdata (L, -1)); |
|
|
|
lua_settop (L, 0); |
|
|
|
|
|
|
|
return res; |
|
|
|
} |
|
|
|
|
|
|
|
static inline void |
|
|
|
rspamd_fuzzy_redis_session_free_args (struct rspamd_fuzzy_redis_session *session) |
|
|
|
{ |
|
|
@@ -136,97 +153,13 @@ rspamd_fuzzy_redis_session_dtor (struct rspamd_fuzzy_redis_session *session, |
|
|
|
g_free (session); |
|
|
|
} |
|
|
|
|
|
|
|
static gboolean |
|
|
|
rspamd_fuzzy_backend_redis_try_ucl (struct rspamd_fuzzy_backend_redis *backend, |
|
|
|
const ucl_object_t *obj, |
|
|
|
struct rspamd_config *cfg) |
|
|
|
{ |
|
|
|
const ucl_object_t *elt, *relt; |
|
|
|
|
|
|
|
elt = ucl_object_lookup_any (obj, "read_servers", "servers", NULL); |
|
|
|
|
|
|
|
if (elt == NULL) { |
|
|
|
return FALSE; |
|
|
|
} |
|
|
|
|
|
|
|
backend->read_servers = rspamd_upstreams_create (cfg->ups_ctx); |
|
|
|
if (!rspamd_upstreams_from_ucl (backend->read_servers, elt, |
|
|
|
REDIS_DEFAULT_PORT, NULL)) { |
|
|
|
msg_err_config ("cannot get read servers configuration"); |
|
|
|
return FALSE; |
|
|
|
} |
|
|
|
|
|
|
|
relt = elt; |
|
|
|
|
|
|
|
elt = ucl_object_lookup (obj, "write_servers"); |
|
|
|
if (elt == NULL) { |
|
|
|
/* Use read servers as write ones */ |
|
|
|
g_assert (relt != NULL); |
|
|
|
backend->write_servers = rspamd_upstreams_create (cfg->ups_ctx); |
|
|
|
if (!rspamd_upstreams_from_ucl (backend->write_servers, relt, |
|
|
|
REDIS_DEFAULT_PORT, NULL)) { |
|
|
|
msg_err_config ("cannot get write servers configuration"); |
|
|
|
return FALSE; |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
backend->write_servers = rspamd_upstreams_create (cfg->ups_ctx); |
|
|
|
if (!rspamd_upstreams_from_ucl (backend->write_servers, elt, |
|
|
|
REDIS_DEFAULT_PORT, NULL)) { |
|
|
|
msg_err_config ("cannot get write servers configuration"); |
|
|
|
rspamd_upstreams_destroy (backend->write_servers); |
|
|
|
backend->write_servers = NULL; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
elt = ucl_object_lookup (obj, "prefix"); |
|
|
|
if (elt == NULL || ucl_object_type (elt) != UCL_STRING) { |
|
|
|
backend->redis_object = REDIS_DEFAULT_OBJECT; |
|
|
|
} |
|
|
|
else { |
|
|
|
backend->redis_object = ucl_object_tostring (elt); |
|
|
|
} |
|
|
|
|
|
|
|
elt = ucl_object_lookup (obj, "timeout"); |
|
|
|
if (elt) { |
|
|
|
backend->timeout = ucl_object_todouble (elt); |
|
|
|
} |
|
|
|
else { |
|
|
|
backend->timeout = REDIS_DEFAULT_TIMEOUT; |
|
|
|
} |
|
|
|
|
|
|
|
elt = ucl_object_lookup (obj, "password"); |
|
|
|
if (elt) { |
|
|
|
backend->password = ucl_object_tostring (elt); |
|
|
|
} |
|
|
|
else { |
|
|
|
backend->password = NULL; |
|
|
|
} |
|
|
|
|
|
|
|
elt = ucl_object_lookup_any (obj, "db", "database", "dbname", NULL); |
|
|
|
if (elt) { |
|
|
|
if (ucl_object_type (elt) == UCL_STRING) { |
|
|
|
backend->dbname = ucl_object_tostring (elt); |
|
|
|
} |
|
|
|
else if (ucl_object_type (elt) == UCL_INT) { |
|
|
|
backend->dbname = ucl_object_tostring_forced (elt); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
backend->dbname = NULL; |
|
|
|
} |
|
|
|
|
|
|
|
return TRUE; |
|
|
|
} |
|
|
|
|
|
|
|
static void |
|
|
|
rspamd_fuzzy_backend_redis_dtor (struct rspamd_fuzzy_backend_redis *backend) |
|
|
|
{ |
|
|
|
if (backend->read_servers) { |
|
|
|
rspamd_upstreams_destroy (backend->read_servers); |
|
|
|
} |
|
|
|
if (backend->write_servers) { |
|
|
|
rspamd_upstreams_destroy (backend->write_servers); |
|
|
|
lua_State *L = backend->L; |
|
|
|
|
|
|
|
if (backend->conf_ref) { |
|
|
|
luaL_unref (L, LUA_REGISTRYINDEX, backend->conf_ref); |
|
|
|
} |
|
|
|
|
|
|
|
if (backend->id) { |
|
|
@@ -245,13 +178,16 @@ rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk, |
|
|
|
gboolean ret = FALSE; |
|
|
|
guchar id_hash[rspamd_cryptobox_HASHBYTES]; |
|
|
|
rspamd_cryptobox_hash_state_t st; |
|
|
|
lua_State *L = (lua_State *)cfg->lua_state; |
|
|
|
gint conf_ref = -1; |
|
|
|
|
|
|
|
backend = g_malloc0 (sizeof (*backend)); |
|
|
|
|
|
|
|
backend->timeout = REDIS_DEFAULT_TIMEOUT; |
|
|
|
backend->redis_object = REDIS_DEFAULT_OBJECT; |
|
|
|
backend->L = L; |
|
|
|
|
|
|
|
ret = rspamd_fuzzy_backend_redis_try_ucl (backend, obj, cfg); |
|
|
|
ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref); |
|
|
|
|
|
|
|
/* Now try global redis settings */ |
|
|
|
if (!ret) { |
|
|
@@ -264,11 +200,10 @@ rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk, |
|
|
|
NULL); |
|
|
|
|
|
|
|
if (specific_obj) { |
|
|
|
ret = rspamd_fuzzy_backend_redis_try_ucl (backend, specific_obj, |
|
|
|
cfg); |
|
|
|
ret = rspamd_lua_try_load_redis (L, specific_obj, cfg, &conf_ref); |
|
|
|
} |
|
|
|
else { |
|
|
|
ret = rspamd_fuzzy_backend_redis_try_ucl (backend, elt, cfg); |
|
|
|
ret = rspamd_lua_try_load_redis (L, elt, cfg, &conf_ref); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@@ -280,6 +215,44 @@ rspamd_fuzzy_backend_init_redis (struct rspamd_fuzzy_backend *bk, |
|
|
|
return NULL; |
|
|
|
} |
|
|
|
|
|
|
|
elt = ucl_object_lookup (obj, "prefix"); |
|
|
|
if (elt == NULL || ucl_object_type (elt) != UCL_STRING) { |
|
|
|
backend->redis_object = REDIS_DEFAULT_OBJECT; |
|
|
|
} |
|
|
|
else { |
|
|
|
backend->redis_object = ucl_object_tostring (elt); |
|
|
|
} |
|
|
|
|
|
|
|
backend->conf_ref = conf_ref; |
|
|
|
|
|
|
|
/* Check some common table values */ |
|
|
|
lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref); |
|
|
|
|
|
|
|
lua_pushstring (L, "timeout"); |
|
|
|
lua_gettable (L, -2); |
|
|
|
if (lua_type (L, -1) == LUA_TNUMBER) { |
|
|
|
backend->timeout = lua_tonumber (L, -1); |
|
|
|
} |
|
|
|
lua_pop (L, 1); |
|
|
|
|
|
|
|
lua_pushstring (L, "db"); |
|
|
|
lua_gettable (L, -2); |
|
|
|
if (lua_type (L, -1) == LUA_TSTRING) { |
|
|
|
backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool, |
|
|
|
lua_tostring (L, -1)); |
|
|
|
} |
|
|
|
lua_pop (L, 1); |
|
|
|
|
|
|
|
lua_pushstring (L, "password"); |
|
|
|
lua_gettable (L, -2); |
|
|
|
if (lua_type (L, -1) == LUA_TSTRING) { |
|
|
|
backend->password = rspamd_mempool_strdup (cfg->cfg_pool, |
|
|
|
lua_tostring (L, -1)); |
|
|
|
} |
|
|
|
lua_pop (L, 1); |
|
|
|
|
|
|
|
lua_settop (L, 0); |
|
|
|
|
|
|
|
REF_INIT_RETAIN (backend, rspamd_fuzzy_backend_redis_dtor); |
|
|
|
backend->pool = cfg->redis_pool; |
|
|
|
rspamd_cryptobox_hash_init (&st, NULL, 0); |
|
|
@@ -628,6 +601,7 @@ rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk, |
|
|
|
struct rspamd_fuzzy_backend_redis *backend = subr_ud; |
|
|
|
struct rspamd_fuzzy_redis_session *session; |
|
|
|
struct upstream *up; |
|
|
|
struct upstream_list *ups; |
|
|
|
struct timeval tv; |
|
|
|
rspamd_inet_addr_t *addr; |
|
|
|
struct rspamd_fuzzy_reply rep; |
|
|
@@ -667,7 +641,8 @@ rspamd_fuzzy_backend_check_redis (struct rspamd_fuzzy_backend *bk, |
|
|
|
session->argv_lens[4] = 1; |
|
|
|
g_string_free (key, FALSE); /* Do not free underlying array */ |
|
|
|
|
|
|
|
up = rspamd_upstream_get (backend->read_servers, |
|
|
|
ups = rspamd_redis_get_servers (backend, "read_servers"); |
|
|
|
up = rspamd_upstream_get (ups, |
|
|
|
RSPAMD_UPSTREAM_ROUND_ROBIN, |
|
|
|
NULL, |
|
|
|
0); |
|
|
@@ -765,6 +740,7 @@ rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk, |
|
|
|
struct rspamd_fuzzy_backend_redis *backend = subr_ud; |
|
|
|
struct rspamd_fuzzy_redis_session *session; |
|
|
|
struct upstream *up; |
|
|
|
struct upstream_list *ups; |
|
|
|
struct timeval tv; |
|
|
|
rspamd_inet_addr_t *addr; |
|
|
|
GString *key; |
|
|
@@ -791,7 +767,8 @@ rspamd_fuzzy_backend_count_redis (struct rspamd_fuzzy_backend *bk, |
|
|
|
session->argv_lens[1] = key->len; |
|
|
|
g_string_free (key, FALSE); /* Do not free underlying array */ |
|
|
|
|
|
|
|
up = rspamd_upstream_get (backend->read_servers, |
|
|
|
ups = rspamd_redis_get_servers (backend, "read_servers"); |
|
|
|
up = rspamd_upstream_get (ups, |
|
|
|
RSPAMD_UPSTREAM_ROUND_ROBIN, |
|
|
|
NULL, |
|
|
|
0); |
|
|
@@ -888,6 +865,7 @@ rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk, |
|
|
|
struct rspamd_fuzzy_backend_redis *backend = subr_ud; |
|
|
|
struct rspamd_fuzzy_redis_session *session; |
|
|
|
struct upstream *up; |
|
|
|
struct upstream_list *ups; |
|
|
|
struct timeval tv; |
|
|
|
rspamd_inet_addr_t *addr; |
|
|
|
GString *key; |
|
|
@@ -914,7 +892,8 @@ rspamd_fuzzy_backend_version_redis (struct rspamd_fuzzy_backend *bk, |
|
|
|
session->argv_lens[1] = key->len; |
|
|
|
g_string_free (key, FALSE); /* Do not free underlying array */ |
|
|
|
|
|
|
|
up = rspamd_upstream_get (backend->read_servers, |
|
|
|
ups = rspamd_redis_get_servers (backend, "read_servers"); |
|
|
|
up = rspamd_upstream_get (ups, |
|
|
|
RSPAMD_UPSTREAM_ROUND_ROBIN, |
|
|
|
NULL, |
|
|
|
0); |
|
|
@@ -1376,6 +1355,7 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, |
|
|
|
struct rspamd_fuzzy_backend_redis *backend = subr_ud; |
|
|
|
struct rspamd_fuzzy_redis_session *session; |
|
|
|
struct upstream *up; |
|
|
|
struct upstream_list *ups; |
|
|
|
struct timeval tv; |
|
|
|
rspamd_inet_addr_t *addr; |
|
|
|
guint i; |
|
|
@@ -1472,7 +1452,8 @@ rspamd_fuzzy_backend_update_redis (struct rspamd_fuzzy_backend *bk, |
|
|
|
session->argv = g_malloc0 (sizeof (gchar *) * session->nargs); |
|
|
|
session->argv_lens = g_malloc0 (sizeof (gsize) * session->nargs); |
|
|
|
|
|
|
|
up = rspamd_upstream_get (backend->write_servers, |
|
|
|
ups = rspamd_redis_get_servers (backend, "write_servers"); |
|
|
|
up = rspamd_upstream_get (ups, |
|
|
|
RSPAMD_UPSTREAM_MASTER_SLAVE, |
|
|
|
NULL, |
|
|
|
0); |