guint i, blen, klen;
rspamd_fstring_t *out;
- out = rspamd_fstring_sized_new (1024);
sig = rspamd_mempool_get_variable (task->task_pool,
RSPAMD_MEMPOOL_STAT_SIGNATURE);
return;
}
+ out = rspamd_fstring_sized_new (1024);
klen = rspamd_snprintf (keybuf, sizeof (keybuf), "%s_%s_%s",
prefix, sig, rt->stcf->is_spam ? "S" : "H");
- out->len = 0;
-
/* Cleanup key */
rspamd_printf_fstring (&out, ""
"*2\r\n"
struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
redisAsyncContext *redis;
- rt->has_event = FALSE;
+ if (rt->has_event) {
+ /* Should not happen ! */
+ msg_err ("FIXME: this code path should not be reached!");
+ rspamd_session_remove_event (rt->task->s, NULL, rt);
+ rt->has_event = FALSE;
+ }
/* Stop timeout */
if (ev_can_stop (&rt->timeout_event)) {
ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
/* This calls for all callbacks pending */
redisAsyncFree (redis);
}
-}
-
-static void
-rspamd_redis_fin_learn (gpointer data)
-{
- struct redis_stat_runtime *rt = REDIS_RUNTIME (data);
- redisAsyncContext *redis;
-
- rt->has_event = FALSE;
- if (rt->tokens) {
- g_ptr_array_unref (rt->tokens);
- rt->tokens = NULL;
- }
-
- /* Stop timeout */
- ev_timer_stop (rt->task->event_loop, &rt->timeout_event);
-
- if (rt->redis) {
- redis = rt->redis;
- rt->redis = NULL;
- /* This calls for all callbacks pending */
- redisAsyncFree (redis);
+ if (rt->err) {
+ g_error_free (rt->err);
}
}
}
if (rt->has_event) {
rt->has_event = FALSE;
- rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+ rspamd_session_remove_event (task->s, NULL, rt);
}
}
}
if (rt->has_event) {
- rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+ rt->has_event = FALSE;
+ rspamd_session_remove_event (task->s, NULL, rt);
}
}
}
if (final && rt->has_event) {
- rspamd_session_remove_event (task->s, rspamd_redis_fin, rt);
+ rt->has_event = FALSE;
+ rspamd_session_remove_event (task->s, NULL, rt);
}
}
}
if (rt->has_event) {
- rspamd_session_remove_event (task->s, rspamd_redis_fin_learn, rt);
+ rt->has_event = FALSE;
+ rspamd_session_remove_event (task->s, NULL, rt);
}
}
static void
if (rspamd_redis_expand_object (ctx->redis_object, ctx, task,
&object_expanded) == 0) {
- msg_err_task ("expansion for learning failed for symbol %s "
+ msg_err_task ("expansion for %s failed for symbol %s "
"(maybe learning per user classifier with no user or recipient)",
+ learn ? "learning" : "classifying",
stcf->symbol);
return NULL;
}
rt = rspamd_mempool_alloc0 (task->task_pool, sizeof (*rt));
- rspamd_mempool_add_destructor (task->task_pool,
- rspamd_gerror_free_maybe, &rt->err);
rt->selected = up;
rt->task = task;
rt->ctx = ctx;
redisLibevAttach (task->event_loop, rt->redis);
rspamd_redis_maybe_auth (ctx, rt->redis);
+ rspamd_mempool_add_destructor (task->task_pool, rspamd_redis_fin, rt);
+
return rt;
}
if (redisAsyncCommand (rt->redis, rspamd_redis_connected, rt, "HGET %s %s",
rt->redis_object_expanded, learned_key) == REDIS_OK) {
- rspamd_session_add_event (task->s, rspamd_redis_fin, rt, M);
+ rspamd_session_add_event (task->s, NULL, rt, M);
rt->has_event = TRUE;
rt->tokens = g_ptr_array_ref (tokens);
gpointer ctx)
{
struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
- redisAsyncContext *redis;
-
- if (ev_can_stop (&rt->timeout_event)) {
- ev_timer_stop (task->event_loop, &rt->timeout_event);
- }
-
- if (rt->redis) {
- redis = rt->redis;
- rt->redis = NULL;
- redisAsyncFree (redis);
- }
-
- if (rt->tokens) {
- g_ptr_array_unref (rt->tokens);
- rt->tokens = NULL;
- }
if (rt->err) {
msg_info_task ("cannot retrieve stat tokens from Redis: %e", rt->err);
g_error_free (rt->err);
rt->err = NULL;
-
+ rspamd_redis_fin (rt);
return FALSE;
}
+ rspamd_redis_fin (rt);
+
return TRUE;
}
gint id, gpointer p)
{
struct redis_stat_runtime *rt = REDIS_RUNTIME (p);
- struct upstream *up;
- struct upstream_list *ups;
- rspamd_inet_addr_t *addr;
rspamd_fstring_t *query;
const gchar *redis_cmd;
rspamd_token_t *tok;
return FALSE;
}
- ups = rspamd_redis_get_servers (rt->ctx, "write_servers");
-
- if (!ups) {
- return FALSE;
- }
- up = rspamd_upstream_get (ups,
- RSPAMD_UPSTREAM_MASTER_SLAVE,
- NULL,
- 0);
-
- if (up == NULL) {
- msg_err_task ("no upstreams reachable");
- return FALSE;
- }
-
- rt->selected = up;
-
if (rt->ctx->new_schema) {
if (rt->ctx->stcf->is_spam) {
learned_key = "learns_spam";
}
}
- addr = rspamd_upstream_addr_next (up);
- g_assert (addr != NULL);
-
- if (rspamd_inet_address_get_af (addr) == AF_UNIX) {
- rt->redis = redisAsyncConnectUnix (rspamd_inet_address_to_string (addr));
- }
- else {
- rt->redis = redisAsyncConnect (rspamd_inet_address_to_string (addr),
- rspamd_inet_address_get_port (addr));
- }
-
- if (rt->redis == NULL) {
- msg_warn_task ("cannot connect to redis server %s: %s",
- rspamd_inet_address_to_string_pretty (addr),
- strerror (errno));
-
- return FALSE;
- }
- else if (rt->redis->err != REDIS_OK) {
- msg_warn_task ("cannot connect to redis server %s: %s",
- rspamd_inet_address_to_string_pretty (addr),
- rt->redis->errstr);
- redisAsyncFree (rt->redis);
- rt->redis = NULL;
-
- return FALSE;
- }
-
- redisLibevAttach (task->event_loop, rt->redis);
- rspamd_redis_maybe_auth (rt->ctx, rt->redis);
-
/*
* Add the current key to the set of learned keys
*/
"RSIG");
}
- rspamd_session_add_event (task->s, rspamd_redis_fin_learn, rt, M);
+ rspamd_session_add_event (task->s, NULL, rt, M);
rt->has_event = TRUE;
/* Set timeout */
gpointer ctx, GError **err)
{
struct redis_stat_runtime *rt = REDIS_RUNTIME (runtime);
- redisAsyncContext *redis;
-
- if (ev_can_stop (&rt->timeout_event)) {
- ev_timer_stop (task->event_loop, &rt->timeout_event);
- }
-
- if (rt->redis) {
- redis = rt->redis;
- rt->redis = NULL;
- redisAsyncFree (redis);
- }
if (rt->err) {
g_propagate_error (err, rt->err);
rt->err = NULL;
+ rspamd_redis_fin (rt);
return FALSE;
}
+ rspamd_redis_fin (rt);
+
return TRUE;
}