]> source.dussan.org Git - rspamd.git/commitdiff
* Support redis API emulation
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 4 Nov 2011 15:48:07 +0000 (18:48 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 4 Nov 2011 15:48:07 +0000 (18:48 +0300)
src/kvstorage.c
src/kvstorage_server.c
src/kvstorage_server.h

index f5b314e2994539d60534b2999191f197e494d6e4..afab4a83bf6d6ddce54b1b74fa44adba49e64e9d 100644 (file)
@@ -259,7 +259,7 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, time_
                }
        }
 
-       if (elt && (elt->flags & KV_ELT_PERSISTENT) == 0) {
+       if (elt && (elt->flags & KV_ELT_PERSISTENT) == 0 && elt->expire > 0) {
                /* Check expiration */
                if (now - elt->age > elt->expire) {
                        rspamd_kv_storage_delete (storage, key);
index 0ac84f7a37dcbbc954233c955e63b0b5148d779f..c6190454c3a55cc3579d867b9d3557449dedaa2e 100644 (file)
@@ -37,6 +37,8 @@
 #define ERROR_NOT_FOUND "NOT_FOUND" CRLF
 #define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF
 
+#define ERROR_REDIS_OK "+OK" CRLF
+
 
 static sig_atomic_t wanna_die = 0;
 static sig_atomic_t do_reopen_log = 0;
@@ -94,8 +96,10 @@ init_kvstorage_worker (void)
        /* Set default values */
        ctx->timeout_raw = 300000;
 
-       register_worker_opt (TYPE_SMTP, "timeout", xml_handle_seconds, ctx,
+       register_worker_opt (TYPE_KVSTORAGE, "timeout", xml_handle_seconds, ctx,
                                        G_STRUCT_OFFSET (struct kvstorage_worker_ctx, timeout_raw));
+       register_worker_opt (TYPE_KVSTORAGE, "redis", xml_handle_boolean, ctx,
+                                               G_STRUCT_OFFSET (struct kvstorage_worker_ctx, is_redis));
        return ctx;
 }
 
@@ -131,10 +135,12 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
 {
        gchar                                                           *p, *c, *end;
        gint                                                             state = 0, next_state;
+       gboolean                                                         is_redis;
 
        p = in->begin;
        end = in->begin + in->len;
        c = p;
+       is_redis = session->thr->ctx->is_redis;
 
        /* State machine for parsing */
        while (p <= end) {
@@ -172,12 +178,21 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
                                        /* We got some command, try to parse it */
                                        if (p - c == 3) {
                                                /* Set or get command */
-                                               if (memcmp (c, "get", 3) == 0) {
+                                               if ((c[0] == 'g' || c[0] == 'G') &&
+                                                       (c[1] == 'e' || c[1] == 'E') &&
+                                                       (c[2] == 't' || c[2] == 'T')) {
                                                        session->command = KVSTORAGE_CMD_GET;
                                                }
-                                               else if (memcmp (c, "set", 3) == 0) {
+                                               else if ((c[0] == 's' || c[0] == 'S') &&
+                                                               (c[1] == 'e' || c[1] == 'E') &&
+                                                               (c[2] == 't' || c[2] == 'T')) {
                                                        session->command = KVSTORAGE_CMD_SET;
                                                }
+                                               else if ((c[0] == 'd' || c[0] == 'D') &&
+                                                               (c[1] == 'e' || c[1] == 'E') &&
+                                                               (c[2] == 'l' || c[2] == 'L')) {
+                                                       session->command = KVSTORAGE_CMD_DELETE;
+                                               }
                                                else {
                                                        /* Error */
                                                        return FALSE;
@@ -227,7 +242,14 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
                                        if (session->command == KVSTORAGE_CMD_SET) {
                                                /* Read flags */
                                                state = 99;
-                                               next_state = 3;
+                                               if (is_redis) {
+                                                       next_state = 5;
+                                                       session->flags = 0;
+                                                       session->expire = 0;
+                                               }
+                                               else {
+                                                       next_state = 3;
+                                               }
                                        }
                                        else {
                                                /* Nothing to read for other commands */
@@ -317,20 +339,30 @@ kvstorage_read_socket (f_str_t * in, void *arg)
        struct rspamd_kv_element                        *elt;
        gint                                                             r;
        gchar                                                            outbuf[BUFSIZ];
+       gboolean                                                         is_redis;
 
        if (in->len == 0) {
                /* Skip empty commands */
                return TRUE;
        }
        thr = session->thr;
+       is_redis = thr->ctx->is_redis;
+
        switch (session->state) {
        case KVSTORAGE_STATE_READ_CMD:
                /* Update timestamp */
                session->now = time (NULL);
                if (! parse_kvstorage_command (session, in)) {
                        thr_info ("%ud: unknown command: %V", thr->id, in);
-                       return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND,
-                                       sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE);
+                       if (!is_redis) {
+                               return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND,
+                                               sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE);
+                       }
+                       else {
+                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in);
+                               return rspamd_dispatcher_write (session->dispather, outbuf,
+                                                               r, FALSE, TRUE);
+                       }
                }
                else {
                        session->cf = get_kvstorage_config (session->id);
@@ -348,12 +380,24 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                                elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now);
                                g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
                                if (elt == NULL) {
-                                       return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
-                                                                                                                               sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+                                       if (!is_redis) {
+                                               return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+                                                               sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+                                       }
+                                       else {
+                                               return rspamd_dispatcher_write (session->dispather, "$-1" CRLF,
+                                                               sizeof ("$-1" CRLF) - 1, FALSE, TRUE);
+                                       }
                                }
                                else {
-                                       r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
-                                                       ELT_KEY (elt), elt->flags, elt->size);
+                                       if (!is_redis) {
+                                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
+                                                               ELT_KEY (elt), elt->flags, elt->size);
+                                       }
+                                       else {
+                                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF,
+                                                               elt->size);
+                                       }
                                        if (!rspamd_dispatcher_write (session->dispather, outbuf,
                                                                                                                                r, TRUE, FALSE)) {
                                                return FALSE;
@@ -361,8 +405,14 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                                        if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) {
                                                return FALSE;
                                        }
-                                       return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
-                                                       sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
+                                       if (!is_redis) {
+                                               return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
+                                                               sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
+                                       }
+                                       else {
+                                               return rspamd_dispatcher_write (session->dispather, CRLF,
+                                                               sizeof (CRLF) - 1, FALSE, TRUE);
+                                       }
                                }
                        }
                        else if (session->command == KVSTORAGE_CMD_DELETE) {
@@ -374,13 +424,25 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                                                g_slice_free1 (ELT_SIZE (elt), elt);
                                        }
                                        g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
-                                       return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
-                                                                                                                               sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
+                                       if (!is_redis) {
+                                               return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
+                                                               sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
+                                       }
+                                       else {
+                                               return rspamd_dispatcher_write (session->dispather, ":1" CRLF,
+                                                               sizeof (":1" CRLF) - 1, FALSE, TRUE);
+                                       }
                                }
                                else {
                                        g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
-                                       return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
-                                                                                                               sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+                                       if (!is_redis) {
+                                               return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+                                                               sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+                                       }
+                                       else {
+                                               return rspamd_dispatcher_write (session->dispather, ":0" CRLF,
+                                                               sizeof (":0" CRLF) - 1, FALSE, TRUE);
+                                       }
                                }
                        }
                        else if (session->command == KVSTORAGE_CMD_QUIT) {
@@ -397,8 +459,14 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len,
                                session->flags, session->expire)) {
                        g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
-                       return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
-                                                                                       sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
+                       if (!is_redis) {
+                               return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
+                                               sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
+                       }
+                       else {
+                               return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+                                               sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+                       }
                }
                else {
                        g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
index 808ead805cc7b941c0f58e8afc6ce6f96635b182..b513d33ab14ced14877eb354eff00d4654ebd7e4 100644 (file)
@@ -35,6 +35,7 @@ struct kvstorage_worker_ctx {
        guint32 timeout_raw;
        GList *threads;
        gint s_pair[2];
+       gboolean is_redis;
        memory_pool_t *pool;
        struct event_base *ev_base;
        GStaticMutex log_mtx;