aboutsummaryrefslogtreecommitdiffstats
path: root/src/libstat
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2016-01-11 10:40:19 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2016-01-11 10:40:42 +0000
commitb9dd124a65b946df04e1edc7722f024f798913b3 (patch)
tree5bd750d9e1cee9cade0c34c9faf3bb76bae326df /src/libstat
parent8e8bddff44da6fc4fa9eef6405d7f51adb77d467 (diff)
downloadrspamd-b9dd124a65b946df04e1edc7722f024f798913b3.tar.gz
rspamd-b9dd124a65b946df04e1edc7722f024f798913b3.zip
Implement redis cache operations
Diffstat (limited to 'src/libstat')
-rw-r--r--src/libstat/backends/redis_backend.c2
-rw-r--r--src/libstat/learn_cache/redis_cache.c106
2 files changed, 102 insertions, 6 deletions
diff --git a/src/libstat/backends/redis_backend.c b/src/libstat/backends/redis_backend.c
index 1c1690fc3..f1857871e 100644
--- a/src/libstat/backends/redis_backend.c
+++ b/src/libstat/backends/redis_backend.c
@@ -674,7 +674,7 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
if (c->err == 0) {
if (r != NULL) {
if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
- rt->learned = reply->integer;
+ val = reply->integer;
}
else if (reply->type == REDIS_REPLY_STRING) {
rspamd_strtol (reply->str, reply->len, &val);
diff --git a/src/libstat/learn_cache/redis_cache.c b/src/libstat/learn_cache/redis_cache.c
index 56c651fc9..9224a5f46 100644
--- a/src/libstat/learn_cache/redis_cache.c
+++ b/src/libstat/learn_cache/redis_cache.c
@@ -83,6 +83,75 @@ rspamd_redis_cache_timeout (gint fd, short what, gpointer d)
rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, d);
}
+/* Called when we have checked the specified message id */
+static void
+rspamd_stat_cache_redis_get (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+ struct rspamd_redis_cache_runtime *rt = priv;
+ redisReply *reply = r;
+ struct rspamd_task *task;
+ glong val = 0;
+
+ task = rt->task;
+
+ if (c->err == 0) {
+ if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
+ val = reply->integer;
+ }
+ else if (reply->type == REDIS_REPLY_STRING) {
+ rspamd_strtol (reply->str, reply->len, &val);
+ }
+ else {
+ if (reply->type != REDIS_REPLY_NIL) {
+ msg_err_task ("bad learned type for %s: %d",
+ rt->ctx->stcf->symbol, reply->type);
+ }
+
+ val = 0;
+ }
+
+ if ((val > 0 && (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM)) ||
+ (val < 0 && (task->flags & RSPAMD_TASK_FLAG_LEARN_HAM))) {
+ /* Already learned */
+ g_set_error (&task->err, rspamd_stat_quark (), 404,
+ "<%s> has been already "
+ "learned as %s, ignore it", task->message_id,
+ (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? "spam" : "ham");
+ task->flags |= RSPAMD_TASK_FLAG_ALREADY_LEARNED;
+ }
+ else if (val != 0) {
+ /* Unlearn flag */
+ task->flags |= RSPAMD_TASK_FLAG_UNLEARN;
+ }
+ rspamd_upstream_ok (rt->selected);
+ }
+ else {
+ rspamd_upstream_fail (rt->selected);
+ }
+
+ rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt);
+}
+
+/* Called when we have learned the specified message id */
+static void
+rspamd_stat_cache_redis_set (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+ struct rspamd_redis_cache_runtime *rt = priv;
+ struct rspamd_task *task;
+
+ task = rt->task;
+
+ if (c->err == 0) {
+ /* XXX: we ignore results here */
+ rspamd_upstream_ok (rt->selected);
+ }
+ else {
+ rspamd_upstream_fail (rt->selected);
+ }
+
+ rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt);
+}
+
static void
rspamd_stat_cache_redis_generate_id (struct rspamd_task *task)
{
@@ -183,7 +252,6 @@ rspamd_stat_cache_redis_runtime (struct rspamd_task *task,
struct rspamd_redis_cache_runtime *rt;
struct upstream *up;
rspamd_inet_addr_t *addr;
- struct timeval tv;
g_assert (ctx != NULL);
@@ -223,14 +291,10 @@ rspamd_stat_cache_redis_runtime (struct rspamd_task *task,
g_assert (rt->redis != NULL);
redisLibeventAttach (rt->redis, task->ev_base);
- rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
- rspamd_stat_cache_redis_quark ());
/* Now check stats */
event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_cache_timeout, rt);
event_base_set (task->ev_base, &rt->timeout_event);
- double_to_tv (ctx->timeout, &tv);
- event_add (&rt->timeout_event, &tv);
if (!learn) {
rspamd_stat_cache_redis_generate_id (task);
@@ -246,11 +310,23 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
gpointer c)
{
struct rspamd_redis_cache_runtime *rt = runtime;
+ struct timeval tv;
gchar *h;
h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
g_assert (h != NULL);
+ double_to_tv (rt->ctx->timeout, &tv);
+
+ if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt,
+ "HGET %s %s",
+ rt->ctx->redis_object, h) == REDIS_OK) {
+ rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
+ rspamd_stat_cache_redis_quark ());
+ event_add (&rt->timeout_event, &tv);
+ }
+
+ /* We need to return OK every time */
return RSPAMD_LEARN_OK;
}
@@ -260,6 +336,26 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
gpointer runtime,
gpointer c)
{
+ struct rspamd_redis_cache_runtime *rt = runtime;
+ struct timeval tv;
+ gchar *h;
+ gint flag;
+
+ h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
+ g_assert (h != NULL);
+
+ double_to_tv (rt->ctx->timeout, &tv);
+ flag = (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? 1 : -1;
+
+ if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt,
+ "HSET %s %s %d",
+ rt->ctx->redis_object, h) == REDIS_OK) {
+ rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
+ rspamd_stat_cache_redis_quark ());
+ event_add (&rt->timeout_event, &tv);
+ }
+
+ /* We need to return OK every time */
return RSPAMD_LEARN_OK;
}