diff options
-rw-r--r-- | src/libstat/backends/redis_backend.c | 2 | ||||
-rw-r--r-- | src/libstat/learn_cache/redis_cache.c | 106 |
2 files changed, 102 insertions, 6 deletions
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c index 1c1690fc3..f1857871e 100644 --- a/src/libstat/backends/redis_backend.c +++ b/src/libstat/backends/redis_backend.c @@ -674,7 +674,7 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv) if (c->err == 0) { if (r != NULL) { if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) { - rt->learned = reply->integer; + val = reply->integer; } else if (reply->type == REDIS_REPLY_STRING) { rspamd_strtol (reply->str, reply->len, &val); diff --git a/src/libstat/learn_cache/redis_cache.c b/src/libstat/learn_cache/redis_cache.c index 56c651fc9..9224a5f46 100644 --- a/src/libstat/learn_cache/redis_cache.c +++ b/src/libstat/learn_cache/redis_cache.c @@ -83,6 +83,75 @@ rspamd_redis_cache_timeout (gint fd, short what, gpointer d) rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, d); } +/* Called when we have checked the specified message id */ +static void +rspamd_stat_cache_redis_get (redisAsyncContext *c, gpointer r, gpointer priv) +{ + struct rspamd_redis_cache_runtime *rt = priv; + redisReply *reply = r; + struct rspamd_task *task; + glong val = 0; + + task = rt->task; + + if (c->err == 0) { + if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) { + val = reply->integer; + } + else if (reply->type == REDIS_REPLY_STRING) { + rspamd_strtol (reply->str, reply->len, &val); + } + else { + if (reply->type != REDIS_REPLY_NIL) { + msg_err_task ("bad learned type for %s: %d", + rt->ctx->stcf->symbol, reply->type); + } + + val = 0; + } + + if ((val > 0 && (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM)) || + (val < 0 && (task->flags & RSPAMD_TASK_FLAG_LEARN_HAM))) { + /* Already learned */ + g_set_error (&task->err, rspamd_stat_quark (), 404, + "<%s> has been already " + "learned as %s, ignore it", task->message_id, + (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? "spam" : "ham"); + task->flags |= RSPAMD_TASK_FLAG_ALREADY_LEARNED; + } + else if (val != 0) { + /* Unlearn flag */ + task->flags |= RSPAMD_TASK_FLAG_UNLEARN; + } + rspamd_upstream_ok (rt->selected); + } + else { + rspamd_upstream_fail (rt->selected); + } + + rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt); +} + +/* Called when we have learned the specified message id */ +static void +rspamd_stat_cache_redis_set (redisAsyncContext *c, gpointer r, gpointer priv) +{ + struct rspamd_redis_cache_runtime *rt = priv; + struct rspamd_task *task; + + task = rt->task; + + if (c->err == 0) { + /* XXX: we ignore results here */ + rspamd_upstream_ok (rt->selected); + } + else { + rspamd_upstream_fail (rt->selected); + } + + rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt); +} + static void rspamd_stat_cache_redis_generate_id (struct rspamd_task *task) { @@ -183,7 +252,6 @@ rspamd_stat_cache_redis_runtime (struct rspamd_task *task, struct rspamd_redis_cache_runtime *rt; struct upstream *up; rspamd_inet_addr_t *addr; - struct timeval tv; g_assert (ctx != NULL); @@ -223,14 +291,10 @@ rspamd_stat_cache_redis_runtime (struct rspamd_task *task, g_assert (rt->redis != NULL); redisLibeventAttach (rt->redis, task->ev_base); - rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt, - rspamd_stat_cache_redis_quark ()); /* Now check stats */ event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_cache_timeout, rt); event_base_set (task->ev_base, &rt->timeout_event); - double_to_tv (ctx->timeout, &tv); - event_add (&rt->timeout_event, &tv); if (!learn) { rspamd_stat_cache_redis_generate_id (task); @@ -246,11 +310,23 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task, gpointer c) { struct rspamd_redis_cache_runtime *rt = runtime; + struct timeval tv; gchar *h; h = rspamd_mempool_get_variable (task->task_pool, "words_hash"); g_assert (h != NULL); + double_to_tv (rt->ctx->timeout, &tv); + + if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt, + "HGET %s %s", + rt->ctx->redis_object, h) == REDIS_OK) { + rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt, + rspamd_stat_cache_redis_quark ()); + event_add (&rt->timeout_event, &tv); + } + + /* We need to return OK every time */ return RSPAMD_LEARN_OK; } @@ -260,6 +336,26 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task, gpointer runtime, gpointer c) { + struct rspamd_redis_cache_runtime *rt = runtime; + struct timeval tv; + gchar *h; + gint flag; + + h = rspamd_mempool_get_variable (task->task_pool, "words_hash"); + g_assert (h != NULL); + + double_to_tv (rt->ctx->timeout, &tv); + flag = (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? 1 : -1; + + if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt, + "HSET %s %s %d", + rt->ctx->redis_object, h) == REDIS_OK) { + rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt, + rspamd_stat_cache_redis_quark ()); + event_add (&rt->timeout_event, &tv); + } + + /* We need to return OK every time */ return RSPAMD_LEARN_OK; } |