From c3e917ac341171ba51739a8c4c6a42b68be1f972 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 9 Jan 2016 16:08:08 +0000 Subject: [PATCH] Add lazy stat implementation for redis --- src/libstat/backends/redis_backend.c | 150 +++++++++++++++++++++++++-- 1 file changed, 143 insertions(+), 7 deletions(-) diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c index cf6f90d06..21e441abc 100644 --- a/src/libstat/backends/redis_backend.c +++ b/src/libstat/backends/redis_backend.c @@ -72,6 +72,8 @@ struct redis_stat_runtime { struct rspamd_redis_stat_cbdata; struct rspamd_redis_stat_elt { + struct redis_stat_ctx *ctx; + struct rspamd_stat_async_elt *async; struct event_base *ev_base; ucl_object_t *stat; struct rspamd_redis_stat_cbdata *cbdata; @@ -376,7 +378,6 @@ rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata) if (cbdata) { redisAsyncFree (cbdata->redis); - ucl_object_unref (cbdata->cur); for (i = 0; i < cbdata->cur_keys->len; i ++) { k = g_ptr_array_index (cbdata->cur_keys, i); @@ -387,21 +388,108 @@ rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata) if (cbdata->elt) { cbdata->elt->cbdata = NULL; + /* Re-enable parent event */ + cbdata->elt->async->enabled = TRUE; + + /* Replace ucl object */ + if (cbdata->elt->stat && cbdata->cur) { + ucl_object_unref (cbdata->elt->stat); + cbdata->elt->stat = cbdata->cur; + cbdata->cur = NULL; + } + } + + if (cbdata->cur) { + ucl_object_unref (cbdata->cur); } g_slice_free1 (sizeof (*cbdata), cbdata); } } +/* Called when we get number of learns for a specific key */ +static void +rspamd_redis_stat_learns (redisAsyncContext *c, gpointer r, gpointer priv) +{ + struct rspamd_redis_stat_cbdata *cbdata = priv; + redisReply *reply = r; + ucl_object_t *obj; + gulong num; + + cbdata->inflight --; + + if (c->err == 0 && r != NULL) { + if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) { + num = reply->integer; + } + else if (reply->type == REDIS_REPLY_STRING) { + rspamd_strtoul (reply->str, reply->len, &num); + } + + obj = (ucl_object_t *)ucl_object_find_key (cbdata->cur, "used"); + if (obj) { + obj->value.iv += num; + } + + obj = (ucl_object_t *)ucl_object_find_key (cbdata->cur, "total"); + if (obj) { + obj->value.iv += num; + } + + obj = (ucl_object_t *)ucl_object_find_key (cbdata->cur, "size"); + if (obj) { + /* Size of key + size of int64_t */ + obj->value.iv += num * (sizeof (G_STRINGIFY (G_MAXINT64)) + + sizeof (guint64) + sizeof (gpointer)); + } + } + + if (cbdata->inflight == 0) { + rspamd_redis_async_cbdata_cleanup (cbdata); + } +} + +/* Called when we get number of elements for a specific key */ +static void +rspamd_redis_stat_key (redisAsyncContext *c, gpointer r, gpointer priv) +{ + struct rspamd_redis_stat_cbdata *cbdata = priv; + redisReply *reply = r; + ucl_object_t *obj; + gulong num; + + cbdata->inflight --; + + if (c->err == 0 && r != NULL) { + if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) { + num = reply->integer; + } + else if (reply->type == REDIS_REPLY_STRING) { + rspamd_strtoul (reply->str, reply->len, &num); + } + + obj = (ucl_object_t *)ucl_object_find_key (cbdata->cur, "revision"); + if (obj) { + obj->value.iv += num; + } + } + + if (cbdata->inflight == 0) { + rspamd_redis_async_cbdata_cleanup (cbdata); + } +} + /* Called when we have connected to the redis server and got keys to check */ static void rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv) { struct rspamd_redis_stat_cbdata *cbdata = priv; redisReply *reply = r, *elt; - gchar **k; + gchar **pk, *k; guint i, processed = 0; + cbdata->inflight --; + if (c->err == 0 && r != NULL) { if (reply->type == REDIS_REPLY_ARRAY) { g_ptr_array_set_size (cbdata->cur_keys, reply->elements); @@ -410,23 +498,62 @@ rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv) elt = reply->element[i]; if (elt->type == REDIS_REPLY_STRING) { - k = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i); - *k = g_malloc (elt->len + 1); - rspamd_strlcpy (*k, elt->str, elt->len + 1); + pk = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i); + *pk = g_malloc (elt->len + 1); + rspamd_strlcpy (*pk, elt->str, elt->len + 1); processed ++; } } if (processed) { + for (i = 0; i < cbdata->cur_keys->len; i ++) { + k = (gchar *)g_ptr_array_index (cbdata->cur_keys, i); + + if (k) { + redisAsyncCommand (cbdata->redis, rspamd_redis_stat_key, + cbdata, + "HLEN %s", + k); + redisAsyncCommand (cbdata->redis, rspamd_redis_stat_learns, + cbdata, + "HGET %s learns", + k); + cbdata->inflight += 2; + } + } + /* Set up the required keys */ + ucl_object_insert_key (cbdata->cur, + ucl_object_typed_new (UCL_INT), "revision", 0, false); + ucl_object_insert_key (cbdata->cur, + ucl_object_typed_new (UCL_INT), "used", 0, false); + ucl_object_insert_key (cbdata->cur, + ucl_object_typed_new (UCL_INT), "total", 0, false); + ucl_object_insert_key (cbdata->cur, + ucl_object_typed_new (UCL_INT), "size", 0, false); + ucl_object_insert_key (cbdata->cur, + ucl_object_fromstring (cbdata->elt->ctx->stcf->symbol), + "symbol", 0, false); + ucl_object_insert_key (cbdata->cur, ucl_object_fromstring ("redis"), + "type", 0, false); + ucl_object_insert_key (cbdata->cur, ucl_object_fromint (0), + "languages", 0, false); + ucl_object_insert_key (cbdata->cur, ucl_object_fromint (processed), + "users", 0, false); } } + + rspamd_upstream_ok (cbdata->selected); } else { msg_err ("cannot get keys to gather stat"); rspamd_upstream_fail (cbdata->selected); rspamd_redis_async_cbdata_cleanup (cbdata); } + + if (cbdata->inflight == 0) { + rspamd_redis_async_cbdata_cleanup (cbdata); + } } static void @@ -745,11 +872,13 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, st_elt = g_slice_alloc0 (sizeof (*st_elt)); st_elt->ev_base = ctx->ev_base; + st_elt->ctx = backend; backend->stat_elt = rspamd_stat_ctx_register_async ( rspamd_redis_async_stat_cb, rspamd_redis_async_stat_fin, st_elt, REDIS_STAT_TIMEOUT); + st_elt->async = backend->stat_elt; return (gpointer)backend; } @@ -1071,6 +1200,15 @@ rspamd_redis_get_stat (gpointer runtime, gpointer ctx) { struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); + struct rspamd_redis_stat_elt *st; + + if (rt->ctx->stat_elt) { + st = rt->ctx->stat_elt->ud; + + if (st->stat) { + return ucl_object_ref (st->stat); + } + } return NULL; } @@ -1079,8 +1217,6 @@ gpointer rspamd_redis_load_tokenizer_config (gpointer runtime, gsize *len) { - struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); - return NULL; } -- 2.39.5