]> source.dussan.org Git - rspamd.git/commitdiff
Add lazy stat implementation for redis
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 9 Jan 2016 16:08:08 +0000 (16:08 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 9 Jan 2016 16:08:08 +0000 (16:08 +0000)
src/libstat/backends/redis_backend.c

index cf6f90d06fbeb00356c00be2fd6ff83ebc22913d..21e441abc4b8d07afe755d165dec3664b10fdfb2 100644 (file)
@@ -72,6 +72,8 @@ struct redis_stat_runtime {
 struct rspamd_redis_stat_cbdata;
 
 struct rspamd_redis_stat_elt {
+       struct redis_stat_ctx *ctx;
+       struct rspamd_stat_async_elt *async;
        struct event_base *ev_base;
        ucl_object_t *stat;
        struct rspamd_redis_stat_cbdata *cbdata;
@@ -376,7 +378,6 @@ rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
 
        if (cbdata) {
                redisAsyncFree (cbdata->redis);
-               ucl_object_unref (cbdata->cur);
 
                for (i = 0; i < cbdata->cur_keys->len; i ++) {
                        k = g_ptr_array_index (cbdata->cur_keys, i);
@@ -387,21 +388,108 @@ rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
 
                if (cbdata->elt) {
                        cbdata->elt->cbdata = NULL;
+                       /* Re-enable parent event */
+                       cbdata->elt->async->enabled = TRUE;
+
+                       /* Replace ucl object */
+                       if (cbdata->elt->stat && cbdata->cur) {
+                               ucl_object_unref (cbdata->elt->stat);
+                               cbdata->elt->stat = cbdata->cur;
+                               cbdata->cur = NULL;
+                       }
+               }
+
+               if (cbdata->cur) {
+                       ucl_object_unref (cbdata->cur);
                }
 
                g_slice_free1 (sizeof (*cbdata), cbdata);
        }
 }
 
+/* Called when we get number of learns for a specific key */
+static void
+rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+       struct rspamd_redis_stat_cbdata *cbdata = priv;
+       redisReply *reply = r;
+       ucl_object_t *obj;
+       gulong num;
+
+       cbdata->inflight --;
+
+       if (c->err == 0 && r != NULL) {
+               if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
+                       num = reply->integer;
+               }
+               else if (reply->type == REDIS_REPLY_STRING) {
+                       rspamd_strtoul (reply->str, reply->len, &num);
+               }
+
+               obj = (ucl_object_t *)ucl_object_find_key (cbdata->cur, "used");
+               if (obj) {
+                       obj->value.iv += num;
+               }
+
+               obj = (ucl_object_t *)ucl_object_find_key (cbdata->cur, "total");
+               if (obj) {
+                       obj->value.iv += num;
+               }
+
+               obj = (ucl_object_t *)ucl_object_find_key (cbdata->cur, "size");
+               if (obj) {
+                       /* Size of key + size of int64_t */
+                       obj->value.iv += num * (sizeof (G_STRINGIFY (G_MAXINT64)) +
+                                       sizeof (guint64) + sizeof (gpointer));
+               }
+       }
+
+       if (cbdata->inflight == 0) {
+               rspamd_redis_async_cbdata_cleanup (cbdata);
+       }
+}
+
+/* Called when we get number of elements for a specific key */
+static void
+rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+       struct rspamd_redis_stat_cbdata *cbdata = priv;
+       redisReply *reply = r;
+       ucl_object_t *obj;
+       gulong num;
+
+       cbdata->inflight --;
+
+       if (c->err == 0 && r != NULL) {
+               if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
+                       num = reply->integer;
+               }
+               else if (reply->type == REDIS_REPLY_STRING) {
+                       rspamd_strtoul (reply->str, reply->len, &num);
+               }
+
+               obj = (ucl_object_t *)ucl_object_find_key (cbdata->cur, "revision");
+               if (obj) {
+                       obj->value.iv += num;
+               }
+       }
+
+       if (cbdata->inflight == 0) {
+               rspamd_redis_async_cbdata_cleanup (cbdata);
+       }
+}
+
 /* Called when we have connected to the redis server and got keys to check */
 static void
 rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
 {
        struct rspamd_redis_stat_cbdata *cbdata = priv;
        redisReply *reply = r, *elt;
-       gchar **k;
+       gchar **pk, *k;
        guint i, processed = 0;
 
+       cbdata->inflight --;
+
        if (c->err == 0 && r != NULL) {
                if (reply->type == REDIS_REPLY_ARRAY) {
                        g_ptr_array_set_size (cbdata->cur_keys, reply->elements);
@@ -410,23 +498,62 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
                                elt = reply->element[i];
 
                                if (elt->type == REDIS_REPLY_STRING) {
-                                       k = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i);
-                                       *k = g_malloc (elt->len + 1);
-                                       rspamd_strlcpy (*k, elt->str, elt->len + 1);
+                                       pk = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i);
+                                       *pk = g_malloc (elt->len + 1);
+                                       rspamd_strlcpy (*pk, elt->str, elt->len + 1);
                                        processed ++;
                                }
                        }
 
                        if (processed) {
+                               for (i = 0; i < cbdata->cur_keys->len; i ++) {
+                                       k = (gchar *)g_ptr_array_index (cbdata->cur_keys, i);
+
+                                       if (k) {
+                                               redisAsyncCommand (cbdata->redis, rspamd_redis_stat_key,
+                                                               cbdata,
+                                                               "HLEN %s",
+                                                               k);
+                                               redisAsyncCommand (cbdata->redis, rspamd_redis_stat_learns,
+                                                               cbdata,
+                                                               "HGET %s learns",
+                                                               k);
+                                               cbdata->inflight += 2;
+                                       }
+                               }
 
+                               /* Set up the required keys */
+                               ucl_object_insert_key (cbdata->cur,
+                                               ucl_object_typed_new (UCL_INT), "revision", 0, false);
+                               ucl_object_insert_key (cbdata->cur,
+                                               ucl_object_typed_new (UCL_INT), "used", 0, false);
+                               ucl_object_insert_key (cbdata->cur,
+                                               ucl_object_typed_new (UCL_INT), "total", 0, false);
+                               ucl_object_insert_key (cbdata->cur,
+                                               ucl_object_typed_new (UCL_INT), "size", 0, false);
+                               ucl_object_insert_key (cbdata->cur,
+                                               ucl_object_fromstring (cbdata->elt->ctx->stcf->symbol),
+                                               "symbol", 0, false);
+                               ucl_object_insert_key (cbdata->cur, ucl_object_fromstring ("redis"),
+                                               "type", 0, false);
+                               ucl_object_insert_key (cbdata->cur, ucl_object_fromint (0),
+                                               "languages", 0, false);
+                               ucl_object_insert_key (cbdata->cur, ucl_object_fromint (processed),
+                                               "users", 0, false);
                        }
                }
+
+               rspamd_upstream_ok (cbdata->selected);
        }
        else {
                msg_err ("cannot get keys to gather stat");
                rspamd_upstream_fail (cbdata->selected);
                rspamd_redis_async_cbdata_cleanup (cbdata);
        }
+
+       if (cbdata->inflight == 0) {
+               rspamd_redis_async_cbdata_cleanup (cbdata);
+       }
 }
 
 static void
@@ -745,11 +872,13 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx,
 
        st_elt = g_slice_alloc0 (sizeof (*st_elt));
        st_elt->ev_base = ctx->ev_base;
+       st_elt->ctx = backend;
        backend->stat_elt = rspamd_stat_ctx_register_async (
                        rspamd_redis_async_stat_cb,
                        rspamd_redis_async_stat_fin,
                        st_elt,
                        REDIS_STAT_TIMEOUT);
+       st_elt->async = backend->stat_elt;
 
        return (gpointer)backend;
 }
@@ -1071,6 +1200,15 @@ rspamd_redis_get_stat (gpointer runtime,
                gpointer ctx)
 {
        struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
+       struct rspamd_redis_stat_elt *st;
+
+       if (rt->ctx->stat_elt) {
+               st = rt->ctx->stat_elt->ud;
+
+               if (st->stat) {
+                       return ucl_object_ref (st->stat);
+               }
+       }
 
        return NULL;
 }
@@ -1079,8 +1217,6 @@ gpointer
 rspamd_redis_load_tokenizer_config (gpointer runtime,
                gsize *len)
 {
-       struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
-
        return NULL;
 }