]> source.dussan.org Git - rspamd.git/commitdiff
Implement redis cache operations
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 11 Jan 2016 10:40:19 +0000 (10:40 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 11 Jan 2016 10:40:42 +0000 (10:40 +0000)
src/libstat/backends/redis_backend.c
src/libstat/learn_cache/redis_cache.c

index 1c1690fc39a797acdf7f54bc19654553ed274724..f1857871e8556b043b8c9398aa27227f7bb02745 100644 (file)
@@ -674,7 +674,7 @@ rspamd_redis_connected (redisAsyncContext *c, gpointer r, gpointer priv)
        if (c->err == 0) {
                if (r != NULL) {
                        if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
-                               rt->learned = reply->integer;
+                               val = reply->integer;
                        }
                        else if (reply->type == REDIS_REPLY_STRING) {
                                rspamd_strtol (reply->str, reply->len, &val);
index 56c651fc978bac9065a1521e4e2db7184f98c878..9224a5f46e084c2ccd0859dd608f3512d8417942 100644 (file)
@@ -83,6 +83,75 @@ rspamd_redis_cache_timeout (gint fd, short what, gpointer d)
        rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, d);
 }
 
+/* Called when we have checked the specified message id */
+static void
+rspamd_stat_cache_redis_get (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+       struct rspamd_redis_cache_runtime *rt = priv;
+       redisReply *reply = r;
+       struct rspamd_task *task;
+       glong val = 0;
+
+       task = rt->task;
+
+       if (c->err == 0) {
+               if (G_LIKELY (reply->type == REDIS_REPLY_INTEGER)) {
+                       val = reply->integer;
+               }
+               else if (reply->type == REDIS_REPLY_STRING) {
+                       rspamd_strtol (reply->str, reply->len, &val);
+               }
+               else {
+                       if (reply->type != REDIS_REPLY_NIL) {
+                               msg_err_task ("bad learned type for %s: %d",
+                                       rt->ctx->stcf->symbol, reply->type);
+                       }
+
+                       val = 0;
+               }
+
+               if ((val > 0 && (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM)) ||
+                               (val < 0 && (task->flags & RSPAMD_TASK_FLAG_LEARN_HAM))) {
+                       /* Already learned */
+                       g_set_error (&task->err, rspamd_stat_quark (), 404,
+                                       "<%s> has been already "
+                                       "learned as %s, ignore it", task->message_id,
+                                       (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? "spam" : "ham");
+                       task->flags |= RSPAMD_TASK_FLAG_ALREADY_LEARNED;
+               }
+               else if (val != 0) {
+                       /* Unlearn flag */
+                       task->flags |= RSPAMD_TASK_FLAG_UNLEARN;
+               }
+               rspamd_upstream_ok (rt->selected);
+       }
+       else {
+               rspamd_upstream_fail (rt->selected);
+       }
+
+       rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt);
+}
+
+/* Called when we have learned the specified message id */
+static void
+rspamd_stat_cache_redis_set (redisAsyncContext *c, gpointer r, gpointer priv)
+{
+       struct rspamd_redis_cache_runtime *rt = priv;
+       struct rspamd_task *task;
+
+       task = rt->task;
+
+       if (c->err == 0) {
+               /* XXX: we ignore results here */
+               rspamd_upstream_ok (rt->selected);
+       }
+       else {
+               rspamd_upstream_fail (rt->selected);
+       }
+
+       rspamd_session_remove_event (task->s, rspamd_redis_cache_fin, rt);
+}
+
 static void
 rspamd_stat_cache_redis_generate_id (struct rspamd_task *task)
 {
@@ -183,7 +252,6 @@ rspamd_stat_cache_redis_runtime (struct rspamd_task *task,
        struct rspamd_redis_cache_runtime *rt;
        struct upstream *up;
        rspamd_inet_addr_t *addr;
-       struct timeval tv;
 
        g_assert (ctx != NULL);
 
@@ -223,14 +291,10 @@ rspamd_stat_cache_redis_runtime (struct rspamd_task *task,
        g_assert (rt->redis != NULL);
 
        redisLibeventAttach (rt->redis, task->ev_base);
-       rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
-                       rspamd_stat_cache_redis_quark ());
 
        /* Now check stats */
        event_set (&rt->timeout_event, -1, EV_TIMEOUT, rspamd_redis_cache_timeout, rt);
        event_base_set (task->ev_base, &rt->timeout_event);
-       double_to_tv (ctx->timeout, &tv);
-       event_add (&rt->timeout_event, &tv);
 
        if (!learn) {
                rspamd_stat_cache_redis_generate_id (task);
@@ -246,11 +310,23 @@ rspamd_stat_cache_redis_check (struct rspamd_task *task,
                gpointer c)
 {
        struct rspamd_redis_cache_runtime *rt = runtime;
+       struct timeval tv;
        gchar *h;
 
        h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
        g_assert (h != NULL);
 
+       double_to_tv (rt->ctx->timeout, &tv);
+
+       if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_get, rt,
+                       "HGET %s %s",
+                       rt->ctx->redis_object, h) == REDIS_OK) {
+               rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
+                               rspamd_stat_cache_redis_quark ());
+               event_add (&rt->timeout_event, &tv);
+       }
+
+       /* We need to return OK every time */
        return RSPAMD_LEARN_OK;
 }
 
@@ -260,6 +336,26 @@ rspamd_stat_cache_redis_learn (struct rspamd_task *task,
                gpointer runtime,
                gpointer c)
 {
+       struct rspamd_redis_cache_runtime *rt = runtime;
+       struct timeval tv;
+       gchar *h;
+       gint flag;
+
+       h = rspamd_mempool_get_variable (task->task_pool, "words_hash");
+       g_assert (h != NULL);
+
+       double_to_tv (rt->ctx->timeout, &tv);
+       flag = (task->flags & RSPAMD_TASK_FLAG_LEARN_SPAM) ? 1 : -1;
+
+       if (redisAsyncCommand (rt->redis, rspamd_stat_cache_redis_set, rt,
+                       "HSET %s %s %d",
+                       rt->ctx->redis_object, h) == REDIS_OK) {
+               rspamd_session_add_event (task->s, rspamd_redis_cache_fin, rt,
+                               rspamd_stat_cache_redis_quark ());
+               event_add (&rt->timeout_event, &tv);
+       }
+
+       /* We need to return OK every time */
        return RSPAMD_LEARN_OK;
 }