diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-11-23 14:08:58 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2018-11-23 16:10:28 +0000 |
commit | 1fffe523f6fbf4dea8444f984f07fb453c8669e1 (patch) | |
tree | 8822a1e70bf25968179eb37b2140d6e97ef9687c /src/libstat | |
parent | ee8a7f23d9b5b932a0acba5cf9ce319d222c3052 (diff) | |
download | rspamd-1fffe523f6fbf4dea8444f984f07fb453c8669e1.tar.gz rspamd-1fffe523f6fbf4dea8444f984f07fb453c8669e1.zip |
[Project] Use lua_redis to configure servers in bayes Redis backend
Diffstat (limited to 'src/libstat')
-rw-r--r-- | src/libstat/backends/redis_backend.c | 204 |
1 files changed, 97 insertions, 107 deletions
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c index 9e6c5f46b..20144a7ec 100644 --- a/src/libstat/backends/redis_backend.c +++ b/src/libstat/backends/redis_backend.c @@ -42,9 +42,9 @@ INIT_LOG_MODULE(stat_redis) #define REDIS_STAT_TIMEOUT 30 struct redis_stat_ctx { + lua_State *L; struct rspamd_statfile_config *stcf; - struct upstream_list *read_servers; - struct upstream_list *write_servers; + gint conf_ref; struct rspamd_stat_async_elt *stat_elt; const gchar *redis_object; const gchar *password; @@ -113,6 +113,22 @@ rspamd_redis_stat_quark (void) return g_quark_from_static_string (M); } +static inline struct upstream_list * +rspamd_redis_get_servers (struct redis_stat_ctx *ctx, + const gchar *what) +{ + lua_State *L = ctx->L; + struct upstream_list *res; + + lua_rawgeti (L, LUA_REGISTRYINDEX, ctx->conf_ref); + lua_pushstring (L, what); + lua_gettable (L, -2); + res = *((struct upstream_list**)lua_touserdata (L, -1)); + lua_settop (L, 0); + + return res; +} + /* * Non-static for lua unit testing */ @@ -939,6 +955,7 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d) struct rspamd_redis_stat_elt *redis_elt = elt->ud; struct rspamd_redis_stat_cbdata *cbdata; rspamd_inet_addr_t *addr; + struct upstream_list *ups; g_assert (redis_elt != NULL); @@ -952,8 +969,15 @@ rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d) /* Disable further events unless needed */ elt->enabled = FALSE; + ups = rspamd_redis_get_servers (ctx, "read_servers"); + + if (!ups) { + return; + } + cbdata = g_malloc0 (sizeof (*cbdata)); - cbdata->selected = rspamd_upstream_get (ctx->read_servers, + + cbdata->selected = rspamd_upstream_get (ups, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); @@ -1250,78 +1274,6 @@ rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv) rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt); } } - -static gboolean -rspamd_redis_try_ucl (struct redis_stat_ctx *backend, - const ucl_object_t *obj, - struct rspamd_config *cfg, - const gchar *symbol) -{ - const ucl_object_t *elt, *relt; - - elt = ucl_object_lookup_any (obj, "read_servers", "servers", NULL); - - if (elt == NULL) { - return FALSE; - } - - backend->read_servers = rspamd_upstreams_create (cfg->ups_ctx); - if (!rspamd_upstreams_from_ucl (backend->read_servers, elt, - REDIS_DEFAULT_PORT, NULL)) { - msg_err ("statfile %s cannot get read servers configuration", - symbol); - return FALSE; - } - - relt = elt; - - elt = ucl_object_lookup (obj, "write_servers"); - if (elt == NULL) { - /* Use read servers as write ones */ - g_assert (relt != NULL); - backend->write_servers = rspamd_upstreams_create (cfg->ups_ctx); - if (!rspamd_upstreams_from_ucl (backend->write_servers, relt, - REDIS_DEFAULT_PORT, NULL)) { - msg_err ("statfile %s cannot get write servers configuration", - symbol); - return FALSE; - } - } - else { - backend->write_servers = rspamd_upstreams_create (cfg->ups_ctx); - if (!rspamd_upstreams_from_ucl (backend->write_servers, elt, - REDIS_DEFAULT_PORT, NULL)) { - msg_err ("statfile %s cannot get write servers configuration", - symbol); - rspamd_upstreams_destroy (backend->write_servers); - backend->write_servers = NULL; - } - } - - elt = ucl_object_lookup_any (obj, "db", "database", "dbname", NULL); - if (elt) { - if (ucl_object_type (elt) == UCL_STRING) { - backend->dbname = ucl_object_tostring (elt); - } - else if (ucl_object_type (elt) == UCL_INT) { - backend->dbname = ucl_object_tostring_forced (elt); - } - } - else { - backend->dbname = NULL; - } - - elt = ucl_object_lookup (obj, "password"); - if (elt) { - backend->password = ucl_object_tostring (elt); - } - else { - backend->password = NULL; - } - - return TRUE; -} - static void rspamd_redis_parse_classifier_opts (struct redis_stat_ctx *backend, const ucl_object_t *obj, @@ -1379,14 +1331,6 @@ rspamd_redis_parse_classifier_opts (struct redis_stat_ctx *backend, backend->redis_object = ucl_object_tostring (elt); } - elt = ucl_object_lookup (obj, "timeout"); - if (elt) { - backend->timeout = ucl_object_todouble (elt); - } - else { - backend->timeout = REDIS_DEFAULT_TIMEOUT; - } - elt = ucl_object_lookup (obj, "store_tokens"); if (elt) { backend->store_tokens = ucl_object_toboolean (elt); @@ -1433,24 +1377,27 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_redis_stat_elt *st_elt; const ucl_object_t *obj; gboolean ret = FALSE; + gint conf_ref = -1; + lua_State *L = (lua_State *)cfg->lua_state; backend = g_malloc0 (sizeof (*backend)); + backend->L = L; + backend->timeout = REDIS_DEFAULT_TIMEOUT; /* First search in backend configuration */ obj = ucl_object_lookup (st->classifier->cfg->opts, "backend"); if (obj != NULL && ucl_object_type (obj) == UCL_OBJECT) { - ret = rspamd_redis_try_ucl (backend, obj, cfg, stf->symbol); + ret = rspamd_lua_try_load_redis (L, obj, cfg, &conf_ref); } /* Now try statfiles config */ - if (!ret) { - ret = rspamd_redis_try_ucl (backend, stf->opts, cfg, stf->symbol); + if (!ret && stf->opts) { + ret = rspamd_lua_try_load_redis (L, stf->opts, cfg, &conf_ref); } /* Now try classifier config */ - if (!ret) { - ret = rspamd_redis_try_ucl (backend, st->classifier->cfg->opts, cfg, - stf->symbol); + if (!ret && st->classifier->cfg->opts) { + ret = rspamd_lua_try_load_redis (L, st->classifier->cfg->opts, cfg, &conf_ref); } /* Now try global redis settings */ @@ -1463,12 +1410,12 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, specific_obj = ucl_object_lookup (obj, "statistics"); if (specific_obj) { - ret = rspamd_redis_try_ucl (backend, specific_obj, cfg, - stf->symbol); + ret = rspamd_lua_try_load_redis (L, + specific_obj, cfg, &conf_ref); } else { - ret = rspamd_redis_try_ucl (backend, obj, cfg, - stf->symbol); + ret = rspamd_lua_try_load_redis (L, + obj, cfg, &conf_ref); } } } @@ -1479,6 +1426,36 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, return NULL; } + backend->conf_ref = conf_ref; + + /* Check some common table values */ + lua_rawgeti (L, LUA_REGISTRYINDEX, conf_ref); + + lua_pushstring (L, "timeout"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TNUMBER) { + backend->timeout = lua_tonumber (L, -1); + } + lua_pop (L, 1); + + lua_pushstring (L, "db"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TSTRING) { + backend->dbname = rspamd_mempool_strdup (cfg->cfg_pool, + lua_tostring (L, -1)); + } + lua_pop (L, 1); + + lua_pushstring (L, "password"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TSTRING) { + backend->password = rspamd_mempool_strdup (cfg->cfg_pool, + lua_tostring (L, -1)); + } + lua_pop (L, 1); + + lua_settop (L, 0); + rspamd_redis_parse_classifier_opts (backend, st->classifier->cfg->opts, cfg); stf->clcf->flags |= RSPAMD_FLAG_CLASSIFIER_INCREMENTING_BACKEND; backend->stcf = stf; @@ -1504,25 +1481,35 @@ rspamd_redis_runtime (struct rspamd_task *task, struct redis_stat_ctx *ctx = REDIS_CTX (c); struct redis_stat_runtime *rt; struct upstream *up; + struct upstream_list *ups; char *object_expanded = NULL; rspamd_inet_addr_t *addr; g_assert (ctx != NULL); g_assert (stcf != NULL); - if (learn && ctx->write_servers == NULL) { - msg_err_task ("no write servers defined for %s, cannot learn", stcf->symbol); - return NULL; - } - if (learn) { - up = rspamd_upstream_get (ctx->write_servers, + ups = rspamd_redis_get_servers (ctx, "write_servers"); + + if (!ups) { + msg_err_task ("no write servers defined for %s, cannot learn", + stcf->symbol); + return NULL; + } + up = rspamd_upstream_get (ups, RSPAMD_UPSTREAM_MASTER_SLAVE, NULL, 0); } else { - up = rspamd_upstream_get (ctx->read_servers, + ups = rspamd_redis_get_servers (ctx, "read_servers"); + + if (!ups) { + msg_err_task ("no read servers defined for %s, cannot stat", + stcf->symbol); + return NULL; + } + up = rspamd_upstream_get (ups, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0); @@ -1576,13 +1563,10 @@ void rspamd_redis_close (gpointer p) { struct redis_stat_ctx *ctx = REDIS_CTX (p); + lua_State *L = ctx->L; - if (ctx->read_servers) { - rspamd_upstreams_destroy (ctx->read_servers); - } - - if (ctx->write_servers) { - rspamd_upstreams_destroy (ctx->write_servers); + if (ctx->conf_ref) { + luaL_unref (L, LUA_REGISTRYINDEX, ctx->conf_ref); } g_free (ctx); @@ -1685,6 +1669,7 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, { struct redis_stat_runtime *rt = REDIS_RUNTIME (p); struct upstream *up; + struct upstream_list *ups; rspamd_inet_addr_t *addr; struct timeval tv; rspamd_fstring_t *query; @@ -1698,7 +1683,12 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, return FALSE; } - up = rspamd_upstream_get (rt->ctx->write_servers, + ups = rspamd_redis_get_servers (rt->ctx, "write_servers"); + + if (!ups) { + return FALSE; + } + up = rspamd_upstream_get (ups, RSPAMD_UPSTREAM_MASTER_SLAVE, NULL, 0); |