aboutsummaryrefslogtreecommitdiffstats
path: root/src/libstat
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-01-09 16:08:08 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-01-09 16:08:08 +0000
commitc3e917ac341171ba51739a8c4c6a42b68be1f972 (patch)
tree1388e2bb58cc3665d2e24f78cee94f464e2b6110 /src/libstat
parent193accf7ddbf86d2b9acc351cc7faec02fa9243e (diff)
downloadrspamd-c3e917ac341171ba51739a8c4c6a42b68be1f972.tar.gz
rspamd-c3e917ac341171ba51739a8c4c6a42b68be1f972.zip
Add lazy stat implementation for redis
Diffstat (limited to 'src/libstat')
-rw-r--r--src/libstat/backends/redis_backend.c150
1 files changed, 143 insertions, 7 deletions
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c
index cf6f90d06..21e441abc 100644
--- a/src/libstat/backends/redis_backend.c
+++ b/src/libstat/backends/redis_backend.c
@@ -72,6 +72,8 @@ struct redis_stat_runtime {
struct rspamd_redis_stat_cbdata;
struct rspamd_redis_stat_elt {
+ struct redis_stat_ctx *ctx;
+ struct rspamd_stat_async_elt *async;
struct event_base *ev_base;
ucl_object_t *stat;
struct rspamd_redis_stat_cbdata *cbdata;
@@ -376,7 +378,6 @@ rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
if (cbdata) {
redisAsyncFree (cbdata->redis);
- ucl_object_unref (cbdata->cur);
for (i = 0; i < cbdata->cur_keys->len; i ++) {
k = g_ptr_array_index (cbdata->cur_keys, i);
@@ -387,21 +388,108 @@ rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
if (cbdata->elt) {
cbdata->elt->cbdata = NULL;
+ /* Re-enable parent event */
+ cbdata->elt->async->enabled = TRUE;
+
+ /* Replace ucl object */
+ if (cbdata->elt->stat && cbdata->cur) {
+ ucl_object_unref (cbdata->elt->stat);
+ cbdata->elt->stat = cbdata->cur;
+ cbdata->cur = NULL;
+ }
+ }
+
+ if (cbdata->cur) {
+ ucl_object_unref (cbdata->cur);
}
g_slice_free1 (sizeof (*cbdata), cbdata);
}
}
+/* Called when we get number of learns for a specific key */
+static void
+rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+ struct rspamd_redis_stat_cbdata *cbdata = priv;
+ redisReply *reply = r;
+ ucl_object_t *obj;
+ gulong num;
+
+ cbdata->inflight --;
+
+ if (c->err == 0 && r != NULL) {
+ if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
+ num = reply->integer;
+ }
+ else if (reply->type == REDIS_REPLY_STRING) {
+ rspamd_strtoul (reply->str, reply->len, &num);
+ }
+
+ obj = (ucl_object_t *)ucl_object_find_key (cbdata->cur, "used");
+ if (obj) {
+ obj->value.iv += num;
+ }
+
+ obj = (ucl_object_t *)ucl_object_find_key (cbdata->cur, "total");
+ if (obj) {
+ obj->value.iv += num;
+ }
+
+ obj = (ucl_object_t *)ucl_object_find_key (cbdata->cur, "size");
+ if (obj) {
+ /* Size of key + size of int64_t */
+ obj->value.iv += num * (sizeof (G_STRINGIFY (G_MAXINT64)) +
+ sizeof (guint64) + sizeof (gpointer));
+ }
+ }
+
+ if (cbdata->inflight == 0) {
+ rspamd_redis_async_cbdata_cleanup (cbdata);
+ }
+}
+
+/* Called when we get number of elements for a specific key */
+static void
+rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+ struct rspamd_redis_stat_cbdata *cbdata = priv;
+ redisReply *reply = r;
+ ucl_object_t *obj;
+ gulong num;
+
+ cbdata->inflight --;
+
+ if (c->err == 0 && r != NULL) {
+ if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
+ num = reply->integer;
+ }
+ else if (reply->type == REDIS_REPLY_STRING) {
+ rspamd_strtoul (reply->str, reply->len, &num);
+ }
+
+ obj = (ucl_object_t *)ucl_object_find_key (cbdata->cur, "revision");
+ if (obj) {
+ obj->value.iv += num;
+ }
+ }
+
+ if (cbdata->inflight == 0) {
+ rspamd_redis_async_cbdata_cleanup (cbdata);
+ }
+}
+
/* Called when we have connected to the redis server and got keys to check */
static void
rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
{
struct rspamd_redis_stat_cbdata *cbdata = priv;
redisReply *reply = r, *elt;
- gchar **k;
+ gchar **pk, *k;
guint i, processed = 0;
+ cbdata->inflight --;
+
if (c->err == 0 && r != NULL) {
if (reply->type == REDIS_REPLY_ARRAY) {
g_ptr_array_set_size (cbdata->cur_keys, reply->elements);
@@ -410,23 +498,62 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
elt = reply->element[i];
if (elt->type == REDIS_REPLY_STRING) {
- k = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i);
- *k = g_malloc (elt->len + 1);
- rspamd_strlcpy (*k, elt->str, elt->len + 1);
+ pk = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i);
+ *pk = g_malloc (elt->len + 1);
+ rspamd_strlcpy (*pk, elt->str, elt->len + 1);
processed ++;
}
}
if (processed) {
+ for (i = 0; i < cbdata->cur_keys->len; i ++) {
+ k = (gchar *)g_ptr_array_index (cbdata->cur_keys, i);
+
+ if (k) {
+ redisAsyncCommand (cbdata->redis, rspamd_redis_stat_key,
+ cbdata,
+ "HLEN %s",
+ k);
+ redisAsyncCommand (cbdata->redis, rspamd_redis_stat_learns,
+ cbdata,
+ "HGET %s learns",
+ k);
+ cbdata->inflight += 2;
+ }
+ }
+ /* Set up the required keys */
+ ucl_object_insert_key (cbdata->cur,
+ ucl_object_typed_new (UCL_INT), "revision", 0, false);
+ ucl_object_insert_key (cbdata->cur,
+ ucl_object_typed_new (UCL_INT), "used", 0, false);
+ ucl_object_insert_key (cbdata->cur,
+ ucl_object_typed_new (UCL_INT), "total", 0, false);
+ ucl_object_insert_key (cbdata->cur,
+ ucl_object_typed_new (UCL_INT), "size", 0, false);
+ ucl_object_insert_key (cbdata->cur,
+ ucl_object_fromstring (cbdata->elt->ctx->stcf->symbol),
+ "symbol", 0, false);
+ ucl_object_insert_key (cbdata->cur, ucl_object_fromstring ("redis"),
+ "type", 0, false);
+ ucl_object_insert_key (cbdata->cur, ucl_object_fromint (0),
+ "languages", 0, false);
+ ucl_object_insert_key (cbdata->cur, ucl_object_fromint (processed),
+ "users", 0, false);
}
}
+
+ rspamd_upstream_ok (cbdata->selected);
}
else {
msg_err ("cannot get keys to gather stat");
rspamd_upstream_fail (cbdata->selected);
rspamd_redis_async_cbdata_cleanup (cbdata);
}
+
+ if (cbdata->inflight == 0) {
+ rspamd_redis_async_cbdata_cleanup (cbdata);
+ }
}
static void
@@ -745,11 +872,13 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx,
st_elt = g_slice_alloc0 (sizeof (*st_elt));
st_elt->ev_base = ctx->ev_base;
+ st_elt->ctx = backend;
backend->stat_elt = rspamd_stat_ctx_register_async (
rspamd_redis_async_stat_cb,
rspamd_redis_async_stat_fin,
st_elt,
REDIS_STAT_TIMEOUT);
+ st_elt->async = backend->stat_elt;
return (gpointer)backend;
}
@@ -1071,6 +1200,15 @@ rspamd_redis_get_stat (gpointer runtime,
gpointer ctx)
{
struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
+ struct rspamd_redis_stat_elt *st;
+
+ if (rt->ctx->stat_elt) {
+ st = rt->ctx->stat_elt->ud;
+
+ if (st->stat) {
+ return ucl_object_ref (st->stat);
+ }
+ }
return NULL;
}
@@ -1079,8 +1217,6 @@ gpointer
rspamd_redis_load_tokenizer_config (gpointer runtime,
gsize *len)
{
- struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
-
return NULL;
}