diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-01-09 14:43:24 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2016-01-09 14:43:24 +0000 |
commit | 193accf7ddbf86d2b9acc351cc7faec02fa9243e (patch) | |
tree | 9111d5fec943b1ea3c5d7bbb724205be09e33b9a /src | |
parent | aee3e6a68d84f7ed2047e4b9b939492008b7f7af (diff) | |
download | rspamd-193accf7ddbf86d2b9acc351cc7faec02fa9243e.tar.gz rspamd-193accf7ddbf86d2b9acc351cc7faec02fa9243e.zip |
Start implementation of lazy redis statistics.
Diffstat (limited to 'src')
-rw-r--r-- | src/libstat/backends/redis_backend.c | 147 |
1 files changed, 146 insertions, 1 deletions
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c index 683ba0fda..cf6f90d06 100644 --- a/src/libstat/backends/redis_backend.c +++ b/src/libstat/backends/redis_backend.c @@ -37,11 +37,13 @@ #define REDIS_DEFAULT_PORT 6379 #define REDIS_DEFAULT_OBJECT "%s%l" #define REDIS_DEFAULT_TIMEOUT 0.5 +#define REDIS_STAT_TIMEOUT 30 struct redis_stat_ctx { + struct rspamd_statfile_config *stcf; struct upstream_list *read_servers; struct upstream_list *write_servers; - + struct rspamd_stat_async_elt *stat_elt; const gchar *redis_object; gdouble timeout; }; @@ -66,6 +68,24 @@ struct redis_stat_runtime { enum rspamd_redis_connection_state conn_state; }; +/* Used to get statistics from redis */ +struct rspamd_redis_stat_cbdata; + +struct rspamd_redis_stat_elt { + struct event_base *ev_base; + ucl_object_t *stat; + struct rspamd_redis_stat_cbdata *cbdata; +}; + +struct rspamd_redis_stat_cbdata { + struct rspamd_redis_stat_elt *elt; + redisAsyncContext *redis; + ucl_object_t *cur; + GPtrArray *cur_keys; + struct upstream *selected; + guint inflight; +}; + #define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt) static GQuark @@ -348,6 +368,121 @@ rspamd_redis_tokens_to_query (struct rspamd_task *task, GPtrArray *tokens, return out; } +static void +rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata) +{ + guint i; + gchar *k; + + if (cbdata) { + redisAsyncFree (cbdata->redis); + ucl_object_unref (cbdata->cur); + + for (i = 0; i < cbdata->cur_keys->len; i ++) { + k = g_ptr_array_index (cbdata->cur_keys, i); + g_free (k); + } + + g_ptr_array_free (cbdata->cur_keys, TRUE); + + if (cbdata->elt) { + cbdata->elt->cbdata = NULL; + } + + g_slice_free1 (sizeof (*cbdata), cbdata); + } +} + +/* Called when we have connected to the redis server and got keys to check */ +static void +rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv) +{ + struct rspamd_redis_stat_cbdata *cbdata = priv; + redisReply *reply = r, *elt; + gchar **k; + guint i, processed = 0; + + if (c->err == 0 && r != NULL) { + if (reply->type == REDIS_REPLY_ARRAY) { + g_ptr_array_set_size (cbdata->cur_keys, reply->elements); + + for (i = 0; i < reply->elements; i ++) { + elt = reply->element[i]; + + if (elt->type == REDIS_REPLY_STRING) { + k = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i); + *k = g_malloc (elt->len + 1); + rspamd_strlcpy (*k, elt->str, elt->len + 1); + processed ++; + } + } + + if (processed) { + + } + } + } + else { + msg_err ("cannot get keys to gather stat"); + rspamd_upstream_fail (cbdata->selected); + rspamd_redis_async_cbdata_cleanup (cbdata); + } +} + +static void +rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d) +{ + struct redis_stat_ctx *ctx = REDIS_CTX (d); + struct rspamd_redis_stat_elt *redis_elt = elt->ud; + struct rspamd_redis_stat_cbdata *cbdata; + rspamd_inet_addr_t *addr; + + g_assert (redis_elt != NULL); + + if (redis_elt->cbdata) { + /* We have some other process pending */ + rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata); + } + + /* Disable further events unless needed */ + elt->enabled = FALSE; + + cbdata = g_slice_alloc0 (sizeof (*cbdata)); + cbdata->selected = rspamd_upstream_get (ctx->read_servers, + RSPAMD_UPSTREAM_ROUND_ROBIN, + NULL, + 0); + + g_assert (cbdata->selected != NULL); + addr = rspamd_upstream_addr (cbdata->selected); + g_assert (addr != NULL); + cbdata->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr), + rspamd_inet_address_get_port (addr)); + g_assert (cbdata->redis != NULL); + + redisLibeventAttach (cbdata->redis, redis_elt->ev_base); + + cbdata->inflight = 1; + cbdata->cur = ucl_object_typed_new (UCL_OBJECT); + cbdata->elt = redis_elt; + cbdata->cur_keys = g_ptr_array_new (); + redis_elt->cbdata = cbdata; + + /* XXX: deal with timeouts maybe */ + /* Get keys in redis that match our symbol */ + redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, cbdata, + "KEYS %s*", + ctx->stcf->symbol); +} + +static void +rspamd_redis_async_stat_fin (struct rspamd_stat_async_elt *elt, gpointer d) +{ + struct rspamd_redis_stat_elt *redis_elt = elt->ud; + + rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata); +} + /* Called on connection termination */ static void rspamd_redis_fin (gpointer data) @@ -542,6 +677,7 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, { struct redis_stat_ctx *backend; struct rspamd_statfile_config *stf = st->stcf; + struct rspamd_redis_stat_elt *st_elt; const ucl_object_t *elt; backend = g_slice_alloc0 (sizeof (*backend)); @@ -605,6 +741,15 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, } stf->clcf->flags |= RSPAMD_FLAG_CLASSIFIER_INCREMENTING_BACKEND; + backend->stcf = stf; + + st_elt = g_slice_alloc0 (sizeof (*st_elt)); + st_elt->ev_base = ctx->ev_base; + backend->stat_elt = rspamd_stat_ctx_register_async ( + rspamd_redis_async_stat_cb, + rspamd_redis_async_stat_fin, + st_elt, + REDIS_STAT_TIMEOUT); return (gpointer)backend; } |