#define REDIS_BACKEND_TYPE "redis"
#define REDIS_DEFAULT_PORT 6379
#define REDIS_DEFAULT_OBJECT "%s%l"
+#define REDIS_DEFAULT_TIMEOUT 0.5
struct redis_stat_ctx_elt {
struct upstream_list *read_servers;
struct redis_stat_runtime {
struct rspamd_task *task;
struct upstream *selected;
+ struct event timeout_event;
GArray *results;
gchar *redis_object_expanded;
redisAsyncContext *redis;
+ guint64 learned;
+ gboolean connected;
};
#define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt)
return g_quark_from_static_string ("redis-statistics");
}
-static void
-rspamd_redis_fin (gpointer data)
-{
- struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
-
- redisAsyncFree (rt->redis);
-}
/*
* Non-static for lua unit testing
return tlen;
}
+/* Called on connection termination */
+static void
+rspamd_redis_fin (gpointer data)
+{
+ struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
+
+ redisAsyncFree (rt->redis);
+ event_del (&rt->timeout_event);
+}
+
+static void
+rspamd_redis_timeout (gint fd, short what, gpointer d)
+{
+ struct redis_stat_runtime *rt = REDIS_RUNTIME (d);
+ struct rspamd_task *task;
+
+ task = rt->task;
+
+ msg_err_task ("connection to redis server %s timed out",
+ rspamd_upstream_name (rt->selected));
+ rspamd_upstream_fail (rt->selected);
+ rspamd_session_remove_event (task->s, rspamd_redis_fin, d);
+}
+
+/* Called when we have connected to the redis server and got stats */
+static void
+rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+ struct redis_stat_runtime *rt = REDIS_RUNTIME (priv);
+ redisReply *reply = r;
+ struct rspamd_task *task;
+
+ task = rt->task;
+
+ if (c->err == 0) {
+ if (r != NULL) {
+ if (reply->type == REDIS_REPLY_INTEGER) {
+ rt->learned = reply->integer;
+ }
+ else {
+ rt->learned = 0;
+ }
+
+ rt->connected = TRUE;
+ }
+ else {
+ msg_err_task ("error getting reply from redis server %s: %s",
+ rspamd_upstream_name (rt->selected), c->errstr);
+ rspamd_upstream_fail (rt->selected);
+ rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+ }
+ }
+ else {
+ msg_err_task ("error getting reply from redis server %s: %s",
+ rspamd_upstream_name (rt->selected), c->errstr);
+ rspamd_upstream_fail (rt->selected);
+ rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+ }
+}
+
gpointer
rspamd_redis_init (struct rspamd_stat_ctx *ctx, struct rspamd_config *cfg)
{
msg_err ("statfile %s has no write redis servers, "
"so learning is impossible", stf->symbol);
curst = curst->next;
- continue;
}
else {
backend->write_servers = rspamd_upstreams_create (cfg->ups_ctx);
}
}
+ elt = ucl_object_find_key (stf->opts, "timeout");
+ if (elt) {
+ backend->timeout = ucl_object_todouble (elt);
+ }
+ else {
+ backend->timeout = REDIS_DEFAULT_TIMEOUT;
+ }
+
g_hash_table_insert (new->redis_elts, stf, backend);
ctx->statfiles ++;
struct redis_stat_runtime *rt;
struct upstream *up;
rspamd_inet_addr_t *addr;
+ struct timeval tv;
g_assert (ctx != NULL);
g_assert (stcf != NULL);
return NULL;
}
- rt = rspamd_mempool_alloc (task->task_pool, sizeof (*rt));
+ rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
rspamd_redis_expand_object (elt->redis_object, stcf, task,
&rt->redis_object_expanded);
rt->selected = up;
rspamd_session_add_event (task->s, rspamd_redis_fin, rt,
rspamd_redis_stat_quark ());
+ /* Now check stats */
+ event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt);
+ event_base_set (task->ev_base, &rt->timeout_event);
+ double_to_tv (elt->timeout, &tv);
+ event_add (&rt->timeout_event, &tv);
+ redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
+ rt->redis_object_expanded, "learned");
+
return rt;
}