From 50dba2f90e07efeed5c7ad2c0066a181e330065f Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Fri, 17 Jun 2016 11:20:06 +0100 Subject: [PATCH] [Fix] Finally rework and simplify redis backend for statistics --- src/libstat/backends/redis_backend.c | 224 ++++++++++----------------- 1 file changed, 84 insertions(+), 140 deletions(-) diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c index 823a5f71b..c0d54325e 100644 --- a/src/libstat/backends/redis_backend.c +++ b/src/libstat/backends/redis_backend.c @@ -66,8 +66,6 @@ struct redis_stat_runtime { redisAsyncContext *redis; guint64 learned; gint id; - enum rspamd_redis_connection_state conn_state; - ref_entry_t ref; }; /* Used to get statistics from redis */ @@ -679,11 +677,18 @@ static void rspamd_redis_fin (gpointer data) { struct redis_stat_runtime *rt = REDIS_RUNTIME (data); + redisAsyncContext *redis; - if (rt->conn_state != RSPAMD_REDIS_TERMINATED) { - rt->conn_state = RSPAMD_REDIS_TERMINATED; + /* Stop timeout */ + if (event_get_base (&rt->timeout_event)) { event_del (&rt->timeout_event); - REF_RELEASE (rt); + } + + if (rt->redis) { + redis = rt->redis; + rt->redis = NULL; + /* This calls for all callbacks pending */ + redisAsyncFree (redis); } } @@ -691,11 +696,18 @@ static void rspamd_redis_fin_learn (gpointer data) { struct redis_stat_runtime *rt = REDIS_RUNTIME (data); + redisAsyncContext *redis; - if (rt->conn_state != RSPAMD_REDIS_TERMINATED) { - rt->conn_state = RSPAMD_REDIS_TERMINATED; + /* Stop timeout */ + if (event_get_base (&rt->timeout_event)) { event_del (&rt->timeout_event); - REF_RELEASE (rt); + } + + if (rt->redis) { + redis = rt->redis; + rt->redis = NULL; + /* This calls for all callbacks pending */ + redisAsyncFree (redis); } } @@ -704,26 +716,19 @@ rspamd_redis_timeout (gint fd, short what, gpointer d) { struct redis_stat_runtime *rt = REDIS_RUNTIME (d); struct rspamd_task *task; + redisAsyncContext *redis; task = rt->task; - REF_RETAIN (rt); msg_err_task_check ("connection to redis server %s timed out", rspamd_upstream_name (rt->selected)); - rspamd_upstream_fail (rt->selected); - - if (rt->conn_state == RSPAMD_REDIS_REQUEST_SENT && rt->task) { - rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); - } - - rt->conn_state = RSPAMD_REDIS_TERMINATED; if (rt->redis) { - redisAsyncFree (rt->redis); + redis = rt->redis; + rt->redis = NULL; + /* This calls for all callbacks pending */ + redisAsyncFree (redis); } - - rt->redis = NULL; - REF_RELEASE (rt); } /* Called when we have connected to the redis server and got stats */ @@ -737,12 +742,6 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv) task = rt->task; - if (rt->conn_state == RSPAMD_REDIS_TERMINATED) { - /* Task has disappeared already */ - REF_RELEASE (rt); - return; - } - if (c->err == 0) { if (r != NULL) { if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) { @@ -767,27 +766,17 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv) } rt->learned = val; - REF_RETAIN (rt); msg_debug_task ("connected to redis server, tokens learned for %s: %uL", rt->redis_object_expanded, rt->learned); rspamd_upstream_ok (rt->selected); - /* This also set state to terminated state */ - rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); - rt->conn_state = RSPAMD_REDIS_CONNECTED; - } - else { - /* This could be caused by removing redis context forcefully */ - rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); } } else { msg_err_task ("error getting reply from redis server %s: %s", rspamd_upstream_name (rt->selected), c->errstr); rspamd_upstream_fail (rt->selected); - rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); } - REF_RELEASE (rt); } /* Called when we have received tokens values from redis */ @@ -804,12 +793,6 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv) task = rt->task; - if (rt->conn_state == RSPAMD_REDIS_TERMINATED) { - /* Task has disappeared already */ - REF_RELEASE (rt); - return; - } - if (c->err == 0) { if (r != NULL) { if (reply->type == REDIS_REPLY_ARRAY) { @@ -851,36 +834,29 @@ rspamd_redis_processed (redisAsyncContext *c, gpointer r, gpointer priv) } } else { - msg_err_task ("got invalid length of reply vector from redis: " + msg_err_task_check ("got invalid length of reply vector from redis: " "%d, expected: %d", (gint)reply->elements, (gint)task->tokens->len); } } else { - msg_err_task ("got invalid reply from redis: %d", + msg_err_task_check ("got invalid reply from redis: %d", reply->type); } - msg_debug_task ("received tokens for %s: %d processed, %d found", + msg_debug_task_check ("received tokens for %s: %d processed, %d found", rt->redis_object_expanded, processed, found); rspamd_upstream_ok (rt->selected); - rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); - } - else { - rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); } - - rt->conn_state = RSPAMD_REDIS_CONNECTED; } else { msg_err_task ("error getting reply from redis server %s: %s", rspamd_upstream_name (rt->selected), c->errstr); rspamd_upstream_fail (rt->selected); - rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); } - REF_RELEASE (rt); + rspamd_session_remove_event (task->s, rspamd_redis_fin, rt); } /* Called when we have set tokens during learning */ @@ -892,29 +868,16 @@ rspamd_redis_learned (redisAsyncContext *c, gpointer r, gpointer priv) task = rt->task; - if (rt->conn_state == RSPAMD_REDIS_TERMINATED) { - /* Task has disappeared already */ - REF_RELEASE (rt); - return; - } - if (c->err == 0) { rspamd_upstream_ok (rt->selected); rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt); } else { - msg_err_task ("error getting reply from redis server %s: %s", + msg_err_task_check ("error getting reply from redis server %s: %s", rspamd_upstream_name (rt->selected), c->errstr); rspamd_upstream_fail (rt->selected); rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt); } - - if (rt->conn_state != RSPAMD_REDIS_TERMINATED) { - rt->conn_state = RSPAMD_REDIS_TERMINATED; - redisAsyncFree (rt->redis); - } - - REF_RELEASE (rt); } static gboolean @@ -1094,16 +1057,6 @@ rspamd_redis_init (struct rspamd_stat_ctx *ctx, return (gpointer)backend; } -static void -rspamd_redis_runtime_dtor (struct redis_stat_runtime *rt) -{ - if (event_get_base (&rt->timeout_event)) { - event_del (&rt->timeout_event); - } - - g_slice_free1 (sizeof (*rt), rt); -} - gpointer rspamd_redis_runtime (struct rspamd_task *task, struct rspamd_statfile_config *stcf, @@ -1113,7 +1066,6 @@ rspamd_redis_runtime (struct rspamd_task *task, struct redis_stat_runtime *rt; struct upstream *up; rspamd_inet_addr_t *addr; - struct timeval tv; g_assert (ctx != NULL); g_assert (stcf != NULL); @@ -1141,40 +1093,27 @@ rspamd_redis_runtime (struct rspamd_task *task, return NULL; } - rt = g_slice_alloc0 (sizeof (*rt)); - REF_INIT_RETAIN (rt, rspamd_redis_runtime_dtor); + rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt)); rspamd_redis_expand_object (ctx->redis_object, ctx, task, &rt->redis_object_expanded); rt->selected = up; rt->task = task; rt->ctx = ctx; rt->stcf = stcf; - rt->conn_state = RSPAMD_REDIS_DISCONNECTED; addr = rspamd_upstream_addr (up); g_assert (addr != NULL); rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr), rspamd_inet_address_get_port (addr)); - g_assert (rt->redis != NULL); + + if (rt->redis == NULL) { + msg_err_task ("cannot connect redis"); + return NULL; + } redisLibeventAttach (rt->redis, task->ev_base); rspamd_redis_maybe_auth (ctx, rt->redis); - if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s", - rt->redis_object_expanded, "learns") == REDIS_OK) { - rt->conn_state = RSPAMD_REDIS_REQUEST_SENT; - - rspamd_session_add_event (task->s, rspamd_redis_fin, rt, - rspamd_redis_stat_quark ()); - - event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt); - event_base_set (task->ev_base, &rt->timeout_event); - double_to_tv (ctx->timeout, &tv); - event_add (&rt->timeout_event, &tv); - /* Cleared by timeout */ - REF_RETAIN (rt); - } - return rt; } @@ -1204,34 +1143,42 @@ rspamd_redis_process_tokens (struct rspamd_task *task, struct timeval tv; gint ret; - if (tokens == NULL || tokens->len == 0 || rt->redis == NULL || - rt->conn_state != RSPAMD_REDIS_CONNECTED) { + if (tokens == NULL || tokens->len == 0 || rt->redis == NULL) { return FALSE; } rt->id = id; - query = rspamd_redis_tokens_to_query (task, tokens, - "HMGET", rt->redis_object_expanded, FALSE, -1, - rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER); - g_assert (query != NULL); - rspamd_mempool_add_destructor (task->task_pool, - (rspamd_mempool_destruct_t)rspamd_fstring_free, query); - ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_processed, rt, - query->str, query->len); - if (ret == REDIS_OK) { - rt->conn_state = RSPAMD_REDIS_REQUEST_SENT; + if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s", + rt->redis_object_expanded, "learns") == REDIS_OK) { + rspamd_session_add_event (task->s, rspamd_redis_fin, rt, rspamd_redis_stat_quark ()); - /* Reset timeout */ - event_del (&rt->timeout_event); + + if (event_get_base (&rt->timeout_event)) { + event_del (&rt->timeout_event); + } + event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt); + event_base_set (task->ev_base, &rt->timeout_event); double_to_tv (rt->ctx->timeout, &tv); event_add (&rt->timeout_event, &tv); - return TRUE; - } - else { - msg_err_task ("call to redis failed: %s", rt->redis->errstr); + query = rspamd_redis_tokens_to_query (task, tokens, + "HMGET", rt->redis_object_expanded, FALSE, -1, + rt->stcf->clcf->flags & RSPAMD_FLAG_CLASSIFIER_INTEGER); + g_assert (query != NULL); + rspamd_mempool_add_destructor (task->task_pool, + (rspamd_mempool_destruct_t)rspamd_fstring_free, query); + + ret = redisAsyncFormattedCommand (rt->redis, rspamd_redis_processed, rt, + query->str, query->len); + + if (ret == REDIS_OK) { + return TRUE; + } + else { + msg_err_task ("call to redis failed: %s", rt->redis->errstr); + } } return FALSE; @@ -1242,14 +1189,16 @@ rspamd_redis_finalize_process (struct rspamd_task *task, gpointer runtime, gpointer ctx) { struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); + redisAsyncContext *redis; - if (rt->conn_state != RSPAMD_REDIS_TERMINATED) { + if (event_get_base (&rt->timeout_event)) { event_del (&rt->timeout_event); - rt->conn_state = RSPAMD_REDIS_TERMINATED; + } - redisAsyncFree (rt->redis); + if (rt->redis) { + redis = rt->redis; rt->redis = NULL; - REF_RELEASE (rt); + redisAsyncFree (redis); } } @@ -1266,13 +1215,6 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, rspamd_token_t *tok; gint ret; - if (rt->conn_state == RSPAMD_REDIS_CONNECTED) { - /* We are likely in some bad state */ - msg_err_task ("invalid state for function: %d", rt->conn_state); - - return FALSE; - } - up = rspamd_upstream_get (rt->ctx->write_servers, RSPAMD_UPSTREAM_MASTER_SLAVE, NULL, @@ -1359,11 +1301,14 @@ rspamd_redis_learn_tokens (struct rspamd_task *task, GPtrArray *tokens, if (ret == REDIS_OK) { rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt, rspamd_redis_stat_quark ()); - /* Reset timeout */ - event_del (&rt->timeout_event); + /* Set timeout */ + if (event_get_base (&rt->timeout_event)) { + event_del (&rt->timeout_event); + } + event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_timeout, rt); + event_base_set (task->ev_base, &rt->timeout_event); double_to_tv (rt->ctx->timeout, &tv); event_add (&rt->timeout_event, &tv); - rt->conn_state = RSPAMD_REDIS_CONNECTED; return TRUE; } @@ -1380,13 +1325,16 @@ rspamd_redis_finalize_learn (struct rspamd_task *task, gpointer runtime, gpointer ctx) { struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); + redisAsyncContext *redis; - if (rt->conn_state == RSPAMD_REDIS_CONNECTED) { + if (event_get_base (&rt->timeout_event)) { event_del (&rt->timeout_event); - rt->conn_state = RSPAMD_REDIS_TERMINATED; - redisAsyncFree (rt->redis); + } + + if (rt->redis) { + redis = rt->redis; rt->redis = NULL; - REF_RELEASE (rt); + redisAsyncFree (redis); } } @@ -1434,18 +1382,15 @@ rspamd_redis_get_stat (gpointer runtime, { struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime); struct rspamd_redis_stat_elt *st; + redisAsyncContext *redis; if (rt->ctx->stat_elt) { st = rt->ctx->stat_elt->ud; if (rt->redis) { - if (rt->conn_state == RSPAMD_REDIS_REQUEST_SENT && rt->task) { - rspamd_session_remove_event (rt->task->s, rspamd_redis_fin, rt); - } - event_del (&rt->timeout_event); - rt->conn_state = RSPAMD_REDIS_TERMINATED; - redisAsyncFree (rt->redis); + redis = rt->redis; rt->redis = NULL; + redisAsyncFree (redis); } if (st->stat) { @@ -1463,5 +1408,4 @@ rspamd_redis_load_tokenizer_config (gpointer runtime, return NULL; } - #endif -- 2.39.5