static void
rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
{
- struct rspamd_redis_stat_cbdata *cbdata = priv;
+ struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
+ struct rspamd_redis_stat_cbdata *cbdata;
redisReply *reply = r;
ucl_object_t *obj;
gulong num = 0;
- if (cbdata->wanna_die) {
+ cbdata = redis_elt->cbdata;
+
+ if (cbdata == NULL || cbdata->wanna_die) {
return;
}
if (cbdata->inflight == 0) {
rspamd_redis_async_cbdata_cleanup (cbdata);
+ redis_elt->cbdata = NULL;
}
}
static void
rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
{
- struct rspamd_redis_stat_cbdata *cbdata = priv;
+ struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
+ struct rspamd_redis_stat_cbdata *cbdata;
redisReply *reply = r;
ucl_object_t *obj;
glong num = 0;
- if (cbdata->wanna_die) {
+ cbdata = redis_elt->cbdata;
+
+ if (cbdata == NULL || cbdata->wanna_die) {
return;
}
if (cbdata->inflight == 0) {
rspamd_redis_async_cbdata_cleanup (cbdata);
+ redis_elt->cbdata = NULL;
}
}
static void
rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
{
- struct rspamd_redis_stat_cbdata *cbdata = priv;
+ struct rspamd_redis_stat_elt *redis_elt = (struct rspamd_redis_stat_elt *)priv;
+ struct rspamd_redis_stat_cbdata *cbdata;
redisReply *reply = r, *elt;
gchar **pk, *k;
guint i, processed = 0;
+ cbdata = redis_elt->cbdata;
- if (cbdata->wanna_die) {
+ if (cbdata == NULL || cbdata->wanna_die) {
return;
}
}
redisAsyncCommand (cbdata->redis,
rspamd_redis_stat_learns,
- cbdata,
+ redis_elt,
"HGET %s %s",
k, learned_key);
cbdata->inflight += 1;
else {
redisAsyncCommand (cbdata->redis,
rspamd_redis_stat_key,
- cbdata,
+ redis_elt,
"HLEN %s",
k);
redisAsyncCommand (cbdata->redis,
rspamd_redis_stat_learns,
- cbdata,
+ redis_elt,
"HGET %s %s",
k, learned_key);
cbdata->inflight += 2;
if (cbdata->inflight == 0) {
rspamd_redis_async_cbdata_cleanup (cbdata);
+ redis_elt->cbdata = NULL;
}
}
else {
rspamd_upstream_fail (cbdata->selected, FALSE);
rspamd_redis_async_cbdata_cleanup (cbdata);
+ redis_elt->cbdata = NULL;
}
}
struct rspamd_redis_stat_cbdata *cbdata;
rspamd_inet_addr_t *addr;
struct upstream_list *ups;
+ redisAsyncContext *redis_ctx;
+ struct upstream *selected;
g_assert (redis_elt != NULL);
if (redis_elt->cbdata) {
/* We have some other process pending */
rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
+ redis_elt->cbdata = NULL;
}
/* Disable further events unless needed */
return;
}
- cbdata = g_malloc0 (sizeof (*cbdata));
-
- cbdata->selected = rspamd_upstream_get (ups,
+ selected = rspamd_upstream_get (ups,
RSPAMD_UPSTREAM_ROUND_ROBIN,
NULL,
0);
- g_assert (cbdata->selected != NULL);
- addr = rspamd_upstream_addr_next (cbdata->selected);
+ g_assert (selected != NULL);
+ addr = rspamd_upstream_addr_next (selected);
g_assert (addr != NULL);
if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
- cbdata->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
+ redis_ctx = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
}
else {
- cbdata->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
+ redis_ctx = redisAsyncConnect (rspamd_inet_address_to_string (addr),
rspamd_inet_address_get_port (addr));
}
- g_assert (cbdata->redis != NULL);
+ if (redis_ctx == NULL) {
+ msg_warn ("cannot connect to redis server %s: %s",
+ rspamd_inet_address_to_string_pretty (addr),
+ strerror (errno));
- redisLibevAttach (redis_elt->event_loop, cbdata->redis);
+ return;
+ }
+ redisLibevAttach (redis_elt->event_loop, redis_ctx);
+ cbdata = g_malloc0 (sizeof (*cbdata));
+ cbdata->redis = redis_ctx;
+ cbdata->selected = selected;
cbdata->inflight = 1;
cbdata->cur = ucl_object_typed_new (UCL_OBJECT);
cbdata->elt = redis_elt;
/* XXX: deal with timeouts maybe */
/* Get keys in redis that match our symbol */
rspamd_redis_maybe_auth (ctx, cbdata->redis);
- redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, cbdata,
+ redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, redis_elt,
"SMEMBERS %s_keys",
ctx->stcf->symbol);
}
struct rspamd_redis_stat_elt *redis_elt = elt->ud;
rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
+ redis_elt->cbdata = NULL;
}
/* Called on connection termination */