aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-01-09 14:43:24 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-01-09 14:43:24 +0000
commit193accf7ddbf86d2b9acc351cc7faec02fa9243e (patch)
tree9111d5fec943b1ea3c5d7bbb724205be09e33b9a /src
parentaee3e6a68d84f7ed2047e4b9b939492008b7f7af (diff)
downloadrspamd-193accf7ddbf86d2b9acc351cc7faec02fa9243e.tar.gz
rspamd-193accf7ddbf86d2b9acc351cc7faec02fa9243e.zip
Start implementation of lazy redis statistics.
Diffstat (limited to 'src')
-rw-r--r--src/libstat/backends/redis_backend.c147
1 files changed, 146 insertions, 1 deletions
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c
index 683ba0fda..cf6f90d06 100644
--- a/src/libstat/backends/redis_backend.c
+++ b/src/libstat/backends/redis_backend.c
@@ -37,11 +37,13 @@
#define REDIS_DEFAULT_PORT 6379
#define REDIS_DEFAULT_OBJECT "%s%l"
#define REDIS_DEFAULT_TIMEOUT 0.5
+#define REDIS_STAT_TIMEOUT 30
struct redis_stat_ctx {
+ struct rspamd_statfile_config *stcf;
struct upstream_list *read_servers;
struct upstream_list *write_servers;
-
+ struct rspamd_stat_async_elt *stat_elt;
const gchar *redis_object;
gdouble timeout;
};
@@ -66,6 +68,24 @@ struct redis_stat_runtime {
enum rspamd_redis_connection_state conn_state;
};
+/* Used to get statistics from redis */
+struct rspamd_redis_stat_cbdata;
+
+struct rspamd_redis_stat_elt {
+ struct event_base *ev_base;
+ ucl_object_t *stat;
+ struct rspamd_redis_stat_cbdata *cbdata;
+};
+
+struct rspamd_redis_stat_cbdata {
+ struct rspamd_redis_stat_elt *elt;
+ redisAsyncContext *redis;
+ ucl_object_t *cur;
+ GPtrArray *cur_keys;
+ struct upstream *selected;
+ guint inflight;
+};
+
#define GET_TASK_ELT(task, elt) (task == NULL ? NULL : (task)->elt)
static GQuark
@@ -348,6 +368,121 @@ rspamd_redis_tokens_to_query (struct rspamd_task *task, GPtrArray *tokens,
return out;
}
+static void
+rspamd_redis_async_cbdata_cleanup (struct rspamd_redis_stat_cbdata *cbdata)
+{
+ guint i;
+ gchar *k;
+
+ if (cbdata) {
+ redisAsyncFree (cbdata->redis);
+ ucl_object_unref (cbdata->cur);
+
+ for (i = 0; i < cbdata->cur_keys->len; i ++) {
+ k = g_ptr_array_index (cbdata->cur_keys, i);
+ g_free (k);
+ }
+
+ g_ptr_array_free (cbdata->cur_keys, TRUE);
+
+ if (cbdata->elt) {
+ cbdata->elt->cbdata = NULL;
+ }
+
+ g_slice_free1 (sizeof (*cbdata), cbdata);
+ }
+}
+
+/* Called when we have connected to the redis server and got keys to check */
+static void
+rspamd_redis_stat_keys (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+ struct rspamd_redis_stat_cbdata *cbdata = priv;
+ redisReply *reply = r, *elt;
+ gchar **k;
+ guint i, processed = 0;
+
+ if (c->err == 0 && r != NULL) {
+ if (reply->type == REDIS_REPLY_ARRAY) {
+ g_ptr_array_set_size (cbdata->cur_keys, reply->elements);
+
+ for (i = 0; i < reply->elements; i ++) {
+ elt = reply->element[i];
+
+ if (elt->type == REDIS_REPLY_STRING) {
+ k = (gchar **)&g_ptr_array_index (cbdata->cur_keys, i);
+ *k = g_malloc (elt->len + 1);
+ rspamd_strlcpy (*k, elt->str, elt->len + 1);
+ processed ++;
+ }
+ }
+
+ if (processed) {
+
+ }
+ }
+ }
+ else {
+ msg_err ("cannot get keys to gather stat");
+ rspamd_upstream_fail (cbdata->selected);
+ rspamd_redis_async_cbdata_cleanup (cbdata);
+ }
+}
+
+static void
+rspamd_redis_async_stat_cb (struct rspamd_stat_async_elt *elt, gpointer d)
+{
+ struct redis_stat_ctx *ctx = REDIS_CTX (d);
+ struct rspamd_redis_stat_elt *redis_elt = elt->ud;
+ struct rspamd_redis_stat_cbdata *cbdata;
+ rspamd_inet_addr_t *addr;
+
+ g_assert (redis_elt != NULL);
+
+ if (redis_elt->cbdata) {
+ /* We have some other process pending */
+ rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
+ }
+
+ /* Disable further events unless needed */
+ elt->enabled = FALSE;
+
+ cbdata = g_slice_alloc0 (sizeof (*cbdata));
+ cbdata->selected = rspamd_upstream_get (ctx->read_servers,
+ RSPAMD_UPSTREAM_ROUND_ROBIN,
+ NULL,
+ 0);
+
+ g_assert (cbdata->selected != NULL);
+ addr = rspamd_upstream_addr (cbdata->selected);
+ g_assert (addr != NULL);
+ cbdata->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
+ rspamd_inet_address_get_port (addr));
+ g_assert (cbdata->redis != NULL);
+
+ redisLibeventAttach (cbdata->redis, redis_elt->ev_base);
+
+ cbdata->inflight = 1;
+ cbdata->cur = ucl_object_typed_new (UCL_OBJECT);
+ cbdata->elt = redis_elt;
+ cbdata->cur_keys = g_ptr_array_new ();
+ redis_elt->cbdata = cbdata;
+
+ /* XXX: deal with timeouts maybe */
+ /* Get keys in redis that match our symbol */
+ redisAsyncCommand (cbdata->redis, rspamd_redis_stat_keys, cbdata,
+ "KEYS %s*",
+ ctx->stcf->symbol);
+}
+
+static void
+rspamd_redis_async_stat_fin (struct rspamd_stat_async_elt *elt, gpointer d)
+{
+ struct rspamd_redis_stat_elt *redis_elt = elt->ud;
+
+ rspamd_redis_async_cbdata_cleanup (redis_elt->cbdata);
+}
+
/* Called on connection termination */
static void
rspamd_redis_fin (gpointer data)
@@ -542,6 +677,7 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx,
{
struct redis_stat_ctx *backend;
struct rspamd_statfile_config *stf = st->stcf;
+ struct rspamd_redis_stat_elt *st_elt;
const ucl_object_t *elt;
backend = g_slice_alloc0 (sizeof (*backend));
@@ -605,6 +741,15 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx,
}
stf->clcf->flags |= RSPAMD_FLAG_CLASSIFIER_INCREMENTING_BACKEND;
+ backend->stcf = stf;
+
+ st_elt = g_slice_alloc0 (sizeof (*st_elt));
+ st_elt->ev_base = ctx->ev_base;
+ backend->stat_elt = rspamd_stat_ctx_register_async (
+ rspamd_redis_async_stat_cb,
+ rspamd_redis_async_stat_fin,
+ st_elt,
+ REDIS_STAT_TIMEOUT);
return (gpointer)backend;
}