#define REDIS_DEFAULT_OBJECT "%s%l"
#define REDIS_DEFAULT_TIMEOUT 0.5
-struct redis_stat_ctx_elt {
+struct redis_stat_ctx {
struct upstream_list *read_servers;
struct upstream_list *write_servers;
gdouble timeout;
};
-struct redis_stat_ctx {
- GHashTable *redis_elts;
-};
-
struct redis_stat_runtime {
+ struct redis_stat_ctx *ctx;
struct rspamd_task *task;
struct upstream *selected;
struct event timeout_event;
}
gpointer
-rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
+rspamd_redis_init (struct rspamd_stat_ctx *ctx,
+ struct rspamd_config *cfg, struct rspamd_statfile *st)
{
- struct redis_stat_ctx *new;
- struct redis_stat_ctx_elt *backend;
- struct rspamd_classifier_config *clf;
- struct rspamd_statfile_config *stf;
- GList *cur, *curst;
+ struct redis_stat_ctx *backend;
+ struct rspamd_statfile_config *stf = st->stcf;
const ucl_object_t *elt;
- new = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*new));
- new->redis_elts = g_hash_table_new (g_direct_hash, g_direct_equal);
-
- /* Iterate over all classifiers and load matching statfiles */
- cur = cfg->classifiers;
-
- while (cur) {
- clf = cur->data;
-
- if (clf->backend != NULL && strcmp (clf->backend, REDIS_BACKEND_TYPE)) {
-
- curst = clf->statfiles;
- while (curst) {
- stf = curst->data;
-
- /*
- * By default, all statfiles are treated as mmaped files
- */
-
- /*
- * Check configuration sanity
- */
- backend = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*backend));
-
- elt = ucl_object_find_key (stf->opts, "read_servers");
- if (elt == NULL) {
- elt = ucl_object_find_key (stf->opts, "servers");
- }
- if (elt == NULL) {
- msg_err ("statfile %s has no redis servers", stf->symbol);
- curst = curst->next;
- continue;
- }
- else {
- backend->read_servers = rspamd_upstreams_create (cfg->ups_ctx);
- if (!rspamd_upstreams_from_ucl (backend->read_servers, elt,
- REDIS_DEFAULT_PORT, NULL)) {
- msg_err ("statfile %s cannot read servers configuration",
- stf->symbol);
- curst = curst->next;
- continue;
- }
- }
-
- elt = ucl_object_find_key (stf->opts, "write_servers");
- if (elt == NULL) {
- msg_err ("statfile %s has no write redis servers, "
- "so learning is impossible", stf->symbol);
- curst = curst->next;
- }
- else {
- backend->write_servers = rspamd_upstreams_create (cfg->ups_ctx);
- if (!rspamd_upstreams_from_ucl (backend->write_servers, elt,
- REDIS_DEFAULT_PORT, NULL)) {
- msg_err ("statfile %s cannot write servers configuration",
- stf->symbol);
- rspamd_upstreams_destroy (backend->write_servers);
- backend->write_servers = NULL;
- }
- }
-
- elt = ucl_object_find_key (stf->opts, "prefix");
- if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
- backend->redis_object = REDIS_DEFAULT_OBJECT;
- }
- else {
- /* XXX: sanity check */
- backend->redis_object = ucl_object_tostring (elt);
- if (rspamd_redis_expand_object (backend->redis_object, stf,
- NULL, NULL) == 0) {
- msg_err ("statfile %s cannot write servers configuration",
- stf->symbol);
- }
- }
+ backend = rspamd_mempool_alloc0 (cfg->cfg_pool, sizeof (*backend));
- elt = ucl_object_find_key (stf->opts, "timeout");
- if (elt) {
- backend->timeout = ucl_object_todouble (elt);
- }
- else {
- backend->timeout = REDIS_DEFAULT_TIMEOUT;
- }
+ elt = ucl_object_find_key (stf->opts, "read_servers");
+ if (elt == NULL) {
+ elt = ucl_object_find_key (stf->opts, "servers");
+ }
+ if (elt == NULL) {
+ msg_err ("statfile %s has no redis servers", stf->symbol);
- g_hash_table_insert (new->redis_elts, stf, backend);
+ return NULL;
+ }
+ else {
+ backend->read_servers = rspamd_upstreams_create (cfg->ups_ctx);
+ if (!rspamd_upstreams_from_ucl (backend->read_servers, elt,
+ REDIS_DEFAULT_PORT, NULL)) {
+ msg_err ("statfile %s cannot read servers configuration",
+ stf->symbol);
+ return NULL;
+ }
+ }
- ctx->statfiles ++;
+ elt = ucl_object_find_key (stf->opts, "write_servers");
+ if (elt == NULL) {
+ msg_err ("statfile %s has no write redis servers, "
+ "so learning is impossible", stf->symbol);
+ backend->write_servers = NULL;
+ }
+ else {
+ backend->write_servers = rspamd_upstreams_create (cfg->ups_ctx);
+ if (!rspamd_upstreams_from_ucl (backend->write_servers, elt,
+ REDIS_DEFAULT_PORT, NULL)) {
+ msg_err ("statfile %s cannot write servers configuration",
+ stf->symbol);
+ rspamd_upstreams_destroy (backend->write_servers);
+ backend->write_servers = NULL;
+ }
+ }
- curst = curst->next;
- }
+ elt = ucl_object_find_key (stf->opts, "prefix");
+ if (elt == NULL || ucl_object_type (elt) != UCL_STRING) {
+ backend->redis_object = REDIS_DEFAULT_OBJECT;
+ }
+ else {
+ /* XXX: sanity check */
+ backend->redis_object = ucl_object_tostring (elt);
+ if (rspamd_redis_expand_object (backend->redis_object, stf,
+ NULL, NULL) == 0) {
+ msg_err ("statfile %s cannot write servers configuration",
+ stf->symbol);
}
+ }
- cur = g_list_next (cur);
+ elt = ucl_object_find_key (stf->opts, "timeout");
+ if (elt) {
+ backend->timeout = ucl_object_todouble (elt);
}
+ else {
+ backend->timeout = REDIS_DEFAULT_TIMEOUT;
+ }
+
- return (gpointer)new;
+ return (gpointer)backend;
}
gpointer
rspamd_redis_runtime (struct rspamd_task *task,
- struct rspamd_statfile_config *stcf,
+ struct rspamd_statfile_config *stcf, \
gboolean learn, gpointer c)
{
struct redis_stat_ctx *ctx = REDIS_CTX (c);
- struct redis_stat_ctx_elt *elt;
struct redis_stat_runtime *rt;
struct upstream *up;
rspamd_inet_addr_t *addr;
g_assert (ctx != NULL);
g_assert (stcf != NULL);
- elt = g_hash_table_lookup (ctx->redis_elts, stcf);
- g_assert (elt != NULL);
-
- if (learn && elt->write_servers == NULL) {
+ if (learn && ctx->write_servers == NULL) {
msg_err ("no write servers defined for %s, cannot learn", stcf->symbol);
return NULL;
}
if (learn) {
- up = rspamd_upstream_get (elt->write_servers,
+ up = rspamd_upstream_get (ctx->write_servers,
RSPAMD_UPSTREAM_MASTER_SLAVE,
NULL,
0);
}
else {
- up = rspamd_upstream_get (elt->read_servers,
+ up = rspamd_upstream_get (ctx->read_servers,
RSPAMD_UPSTREAM_ROUND_ROBIN,
NULL,
0);
}
rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
- rspamd_redis_expand_object (elt->redis_object, stcf, task,
+ rspamd_redis_expand_object (ctx->redis_object, stcf, task,
&rt->redis_object_expanded);
rt->selected = up;
rt->task = task;
/* Now check stats */
event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
event_base_set (task->ev_base, &rt->timeout_event);
- double_to_tv (elt->timeout, &tv);
+ double_to_tv (ctx->timeout, &tv);
event_add (&rt->timeout_event, &tv);
redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
rt->redis_object_expanded, "learned");