]> source.dussan.org Git - rspamd.git/commitdiff
* Support redis unified protocol.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 9 Nov 2011 16:30:57 +0000 (19:30 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 9 Nov 2011 16:30:57 +0000 (19:30 +0300)
src/kvstorage_server.c
src/kvstorage_server.h

index a5361057fe301df54ccc777caa2a2f3ef7de1545..1b542458b2b37692e4c92447399db1305934a56d 100644 (file)
@@ -127,11 +127,62 @@ free_kvstorage_session (struct kvstorage_session *session)
        g_slice_free1 (sizeof (struct kvstorage_session), session);
 }
 
-/**
+/*
  * Parse kvstorage command
  */
 static gboolean
-parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
+parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len)
+{
+       if (len == 3) {
+               /* Set or get command */
+               if ((c[0] == 'g' || c[0] == 'G') &&
+                               (c[1] == 'e' || c[1] == 'E') &&
+                               (c[2] == 't' || c[2] == 'T')) {
+                       session->command = KVSTORAGE_CMD_GET;
+               }
+               else if ((c[0] == 's' || c[0] == 'S') &&
+                               (c[1] == 'e' || c[1] == 'E') &&
+                               (c[2] == 't' || c[2] == 'T')) {
+                       session->command = KVSTORAGE_CMD_SET;
+               }
+               else if ((c[0] == 'd' || c[0] == 'D') &&
+                               (c[1] == 'e' || c[1] == 'E') &&
+                               (c[2] == 'l' || c[2] == 'L')) {
+                       session->command = KVSTORAGE_CMD_DELETE;
+               }
+               else {
+                       /* Error */
+                       return FALSE;
+               }
+       }
+       else if (len == 4) {
+               if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
+                       session->command = KVSTORAGE_CMD_QUIT;
+               }
+               if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) {
+                       session->command = KVSTORAGE_CMD_SYNC;
+               }
+       }
+       else if (len == 6) {
+               if (g_ascii_strncasecmp (c, "delete", 6) == 0) {
+                       session->command = KVSTORAGE_CMD_DELETE;
+               }
+               else if (g_ascii_strncasecmp (c, "select", 6) == 0) {
+                       session->command = KVSTORAGE_CMD_SELECT;
+               }
+               else {
+                       return FALSE;
+               }
+       }
+
+       return TRUE;
+}
+
+/**
+ * Parse kvstorage line
+ */
+static gboolean
+parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
 {
        gchar                                                           *p, *c, *end;
        gint                                                             state = 0, next_state;
@@ -158,9 +209,17 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
                                        next_state = 1;
                                }
                                else if (c == p) {
-                                       /* We have some character, so assume id as 0 and parse command */
-                                       session->id = 0;
-                                       state = 1;
+                                       if (*p != '*') {
+                                               /* We have some character, so assume id as 0 and parse command */
+                                               session->id = 0;
+                                               state = 1;
+                                       }
+                                       else {
+                                               /* In fact it is redis number of commands */
+                                               c = ++p;
+                                               state = 7;
+                                               session->id = 0;
+                                       }
                                }
                                else {
                                        /* We have something wrong here (like some digits and then come non-digits) */
@@ -174,62 +233,24 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
                                p ++;
                        }
                        else {
-                               if ((g_ascii_isspace (*p) || p == end) && p != c) {
-                                       /* We got some command, try to parse it */
-                                       if (p - c == 3) {
-                                               /* Set or get command */
-                                               if ((c[0] == 'g' || c[0] == 'G') &&
-                                                       (c[1] == 'e' || c[1] == 'E') &&
-                                                       (c[2] == 't' || c[2] == 'T')) {
-                                                       session->command = KVSTORAGE_CMD_GET;
-                                               }
-                                               else if ((c[0] == 's' || c[0] == 'S') &&
-                                                               (c[1] == 'e' || c[1] == 'E') &&
-                                                               (c[2] == 't' || c[2] == 'T')) {
-                                                       session->command = KVSTORAGE_CMD_SET;
-                                               }
-                                               else if ((c[0] == 'd' || c[0] == 'D') &&
-                                                               (c[1] == 'e' || c[1] == 'E') &&
-                                                               (c[2] == 'l' || c[2] == 'L')) {
-                                                       session->command = KVSTORAGE_CMD_DELETE;
-                                               }
-                                               else {
-                                                       /* Error */
-                                                       return FALSE;
-                                               }
-                                       }
-                                       else if (p - c == 4) {
-                                               if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
-                                                       session->command = KVSTORAGE_CMD_QUIT;
-                                                       state = 100;
-                                                       continue;
-                                               }
-                                               if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) {
-                                                       session->command = KVSTORAGE_CMD_SYNC;
-                                                       state = 100;
-                                                       continue;
-                                               }
-                                       }
-                                       else if (p - c == 6) {
-                                               if (g_ascii_strncasecmp (c, "delete", 6) == 0) {
-                                                       session->command = KVSTORAGE_CMD_DELETE;
-                                               }
-                                               else if (g_ascii_strncasecmp (c, "select", 6) == 0) {
-                                                       session->command = KVSTORAGE_CMD_SELECT;
-                                                       state = 99;
-                                                       next_state = 6;
-                                                       continue;
-                                               }
-                                               else {
-                                                       return FALSE;
-                                               }
-                                       }
-                                       else {
-                                               return FALSE;
+                               if (parse_kvstorage_command (session, c, p - c)) {
+                                       switch (session->command) {
+
+                                       case KVSTORAGE_CMD_QUIT:
+                                       case KVSTORAGE_CMD_SYNC:
+                                               /* Single argument command */
+                                               state = 100;
+                                               break;
+                                       case KVSTORAGE_CMD_SELECT:
+                                               /* Select command, read id next */
+                                               state = 99;
+                                               next_state = 6;
+                                               break;
+                                       default:
+                                               /* Normal command, read key */
+                                               state = 99;
+                                               next_state = 2;
                                        }
-                                       /* Skip spaces and try to parse key */
-                                       state = 99;
-                                       next_state = 2;
                                }
                                else {
                                        /* Some error */
@@ -331,6 +352,24 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
                                }
                        }
                        break;
+               case 7:
+                       /* Read arguments count */
+                       if (g_ascii_isdigit (*p)) {
+                               p ++;
+                       }
+                       else {
+                               if (g_ascii_isspace (*p) || end == p) {
+                                       session->argc = strtoul (c, NULL, 10);
+                                       session->argnum = 0;
+                                       state = 100;
+                                       /* Switch to arglen state */
+                                       session->state = KVSTORAGE_STATE_READ_ARGLEN;
+                               }
+                               else {
+                                       return FALSE;
+                               }
+                       }
+                       break;
                case 99:
                        /* Skip spaces state */
                        if (g_ascii_isspace (*p)) {
@@ -351,9 +390,214 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
        return state == 100;
 }
 
+/* Process normal kvstorage command */
+static gboolean
+kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
+{
+       gint                                                             r;
+       gchar                                                            outbuf[BUFSIZ];
+       gboolean                                                         res;
+       struct rspamd_kv_element                        *elt;
+
+       if (session->command == KVSTORAGE_CMD_SET) {
+               session->state = KVSTORAGE_STATE_READ_DATA;
+               rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->length);
+       }
+       else if (session->command == KVSTORAGE_CMD_GET) {
+               g_static_rw_lock_reader_lock (&session->cf->storage->rwlock);
+               elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now);
+               if (elt == NULL) {
+                       g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                       if (!is_redis) {
+                               return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+                                               sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+                       }
+                       else {
+                               return rspamd_dispatcher_write (session->dispather, "$-1" CRLF,
+                                               sizeof ("$-1" CRLF) - 1, FALSE, TRUE);
+                       }
+               }
+               else {
+                       session->elt = elt;
+
+                       if (!is_redis) {
+                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
+                                               ELT_KEY (elt), elt->flags, elt->size);
+                       }
+                       else {
+                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF,
+                                               elt->size);
+                       }
+                       if (!rspamd_dispatcher_write (session->dispather, outbuf,
+                                       r, TRUE, FALSE)) {
+                               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                               return FALSE;
+                       }
+                       if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) {
+                               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                               return FALSE;
+                       }
+                       if (!is_redis) {
+                               res = rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
+                                               sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
+                       }
+                       else {
+                               res = rspamd_dispatcher_write (session->dispather, CRLF,
+                                               sizeof (CRLF) - 1, FALSE, TRUE);
+                       }
+                       if (!res) {
+                               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+                       }
+
+                       return res;
+               }
+       }
+       else if (session->command == KVSTORAGE_CMD_DELETE) {
+               g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+               elt = rspamd_kv_storage_delete (session->cf->storage, session->key);
+               if (elt != NULL) {
+                       if ((elt->flags & KV_ELT_DIRTY) == 0) {
+                               /* Free memory if backend has deleted this element */
+                               g_slice_free1 (ELT_SIZE (elt), elt);
+                       }
+                       g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+                       if (!is_redis) {
+                               return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
+                                               sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
+                       }
+                       else {
+                               return rspamd_dispatcher_write (session->dispather, ":1" CRLF,
+                                               sizeof (":1" CRLF) - 1, FALSE, TRUE);
+                       }
+               }
+               else {
+                       g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+                       if (!is_redis) {
+                               return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+                                               sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+                       }
+                       else {
+                               return rspamd_dispatcher_write (session->dispather, ":0" CRLF,
+                                               sizeof (":0" CRLF) - 1, FALSE, TRUE);
+                       }
+               }
+       }
+       else if (session->command == KVSTORAGE_CMD_SYNC) {
+               if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) {
+                       if (!is_redis) {
+                               return rspamd_dispatcher_write (session->dispather, ERROR_COMMON,
+                                               sizeof (ERROR_COMMON) - 1, FALSE, TRUE);
+                       }
+                       else {
+                               return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF,
+                                               sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE);
+                       }
+               }
+               else {
+                       if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) {
+                               if (!is_redis) {
+                                       return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF,
+                                                       sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE);
+                               }
+                               else {
+                                       return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+                                                       sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+                               }
+                       }
+                       else {
+                               if (!is_redis) {
+                                       return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF,
+                                                       sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE);
+                               }
+                               else {
+                                       return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF,
+                                                       sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE);
+                               }
+                       }
+               }
+               g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+       }
+       else if (session->command == KVSTORAGE_CMD_SELECT) {
+               if (!is_redis) {
+                       return rspamd_dispatcher_write (session->dispather, "SELECTED" CRLF,
+                                       sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE);
+               }
+               else {
+                       return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+                                       sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+               }
+       }
+       else if (session->command == KVSTORAGE_CMD_QUIT) {
+               /* Quit session */
+               free_kvstorage_session (session);
+               return FALSE;
+       }
+
+       return TRUE;
+}
+
+static gboolean
+kvstorage_read_arglen (f_str_t *in, guint *len)
+{
+       gchar                                                           *p = in->begin, *end = in->begin + in->len, *c;
+       gint                                                             state = 0;
+
+       while (p < end) {
+               switch (state) {
+               case 0:
+                       if (*p != '$') {
+                               return FALSE;
+                       }
+                       else {
+                               p ++;
+                               c = p;
+                               state = 1;
+                       }
+                       break;
+               case 1:
+                       if (g_ascii_isdigit (*p) && p != end - 1) {
+                               p ++;
+                       }
+                       else {
+                               if (p != end - 1) {
+                                       return FALSE;
+                               }
+                               else {
+                                       *len = strtoul (c, NULL, 10);
+                                       return TRUE;
+                               }
+                       }
+                       break;
+               }
+       }
+
+       return TRUE;
+}
+
+/*
+ * Check number of arguments for a command
+ */
+static gboolean
+kvstorage_check_argnum (struct kvstorage_session *session)
+{
+       switch (session->command) {
+       case KVSTORAGE_CMD_QUIT:
+       case KVSTORAGE_CMD_SYNC:
+               return session->argc == 1;
+       case KVSTORAGE_CMD_SET:
+               return session->argc == 3;
+       default:
+               return session->argc == 2;
+       }
+
+       /* Unreachable */
+       return FALSE;
+}
+
 /**
  * Dispatcher callbacks
  */
+
 /*
  * Callback that is called when there is data to read in buffer
  */
@@ -362,10 +606,10 @@ kvstorage_read_socket (f_str_t * in, void *arg)
 {
        struct kvstorage_session                        *session = (struct kvstorage_session *) arg;
        struct kvstorage_worker_thread          *thr;
-       struct rspamd_kv_element                        *elt;
        gint                                                             r;
+       guint                                                            arglen;
        gchar                                                            outbuf[BUFSIZ];
-       gboolean                                                         is_redis, res;
+       gboolean                                                         is_redis;
 
        if (in->len == 0) {
                /* Skip empty commands */
@@ -378,7 +622,7 @@ kvstorage_read_socket (f_str_t * in, void *arg)
        case KVSTORAGE_STATE_READ_CMD:
                /* Update timestamp */
                session->now = time (NULL);
-               if (! parse_kvstorage_command (session, in)) {
+               if (! parse_kvstorage_line (session, in)) {
                        thr_info ("%ud: unknown command: %V", thr->id, in);
                        if (!is_redis) {
                                return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND,
@@ -403,138 +647,84 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                                                sizeof ("-ERR unknown keystorage" CRLF) - 1, FALSE, TRUE);
                                }
                        }
-                       if (session->command == KVSTORAGE_CMD_SET) {
-                               session->state = KVSTORAGE_STATE_READ_DATA;
-                               rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->length);
-                       }
-                       else if (session->command == KVSTORAGE_CMD_GET) {
-                               g_static_rw_lock_reader_lock (&session->cf->storage->rwlock);
-                               elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now);
-                               if (elt == NULL) {
-                                       g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
-                                       if (!is_redis) {
-                                               return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
-                                                               sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
-                                       }
-                                       else {
-                                               return rspamd_dispatcher_write (session->dispather, "$-1" CRLF,
-                                                               sizeof ("$-1" CRLF) - 1, FALSE, TRUE);
-                                       }
-                               }
-                               else {
-                                       session->elt = elt;
-
-                                       if (!is_redis) {
-                                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
-                                                               ELT_KEY (elt), elt->flags, elt->size);
-                                       }
-                                       else {
-                                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF,
-                                                               elt->size);
-                                       }
-                                       if (!rspamd_dispatcher_write (session->dispather, outbuf,
-                                                                                                                               r, TRUE, FALSE)) {
-                                               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
-                                               return FALSE;
-                                       }
-                                       if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) {
-                                               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
-                                               return FALSE;
-                                       }
-                                       if (!is_redis) {
-                                               res = rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
-                                                               sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
-                                       }
-                                       else {
-                                               res = rspamd_dispatcher_write (session->dispather, CRLF,
-                                                               sizeof (CRLF) - 1, FALSE, TRUE);
-                                       }
-                                       if (!res) {
-                                               g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
-                                       }
-
-                                       return res;
-                               }
+                       if (session->state != KVSTORAGE_STATE_READ_ARGLEN) {
+                               return kvstorage_process_command (session, is_redis);
                        }
-                       else if (session->command == KVSTORAGE_CMD_DELETE) {
-                               g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
-                               elt = rspamd_kv_storage_delete (session->cf->storage, session->key);
-                               if (elt != NULL) {
-                                       if ((elt->flags & KV_ELT_DIRTY) == 0) {
-                                               /* Free memory if backend has deleted this element */
-                                               g_slice_free1 (ELT_SIZE (elt), elt);
-                                       }
-                                       g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
-                                       if (!is_redis) {
-                                               return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
-                                                               sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
-                                       }
-                                       else {
-                                               return rspamd_dispatcher_write (session->dispather, ":1" CRLF,
-                                                               sizeof (":1" CRLF) - 1, FALSE, TRUE);
-                                       }
-                               }
-                               else {
-                                       g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
-                                       if (!is_redis) {
-                                               return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
-                                                               sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
-                                       }
-                                       else {
-                                               return rspamd_dispatcher_write (session->dispather, ":0" CRLF,
-                                                               sizeof (":0" CRLF) - 1, FALSE, TRUE);
-                                       }
-                               }
+               }
+               break;
+       case KVSTORAGE_STATE_READ_ARGLEN:
+               if (! kvstorage_read_arglen (in, &arglen)) {
+                       session->state = KVSTORAGE_STATE_READ_CMD;
+                       r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown arglen '%V'" CRLF, in);
+                       return rspamd_dispatcher_write (session->dispather, outbuf,
+                                       r, FALSE, TRUE);
+               }
+               else {
+                       session->state = KVSTORAGE_STATE_READ_ARG;
+                       rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, arglen);
+               }
+               break;
+       case KVSTORAGE_STATE_READ_ARG:
+               if (session->argnum == 0) {
+                       /* Read command */
+                       if (! parse_kvstorage_command (session, in->begin, in->len)) {
+                               session->state = KVSTORAGE_STATE_READ_CMD;
+                               r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in);
+                               return rspamd_dispatcher_write (session->dispather, outbuf,
+                                               r, FALSE, TRUE);
                        }
-                       else if (session->command == KVSTORAGE_CMD_SYNC) {
-                               if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) {
-                                       if (!is_redis) {
-                                               return rspamd_dispatcher_write (session->dispather, ERROR_COMMON,
-                                                               sizeof (ERROR_COMMON) - 1, FALSE, TRUE);
-                                       }
-                                       else {
-                                               return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF,
-                                                               sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE);
-                                       }
+                       else {
+                               if (! kvstorage_check_argnum (session)) {
+                                       session->state = KVSTORAGE_STATE_READ_CMD;
+                                       r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR invalid argnum for command '%V': %ud" CRLF,
+                                                       in, session->argc);
+                                       return rspamd_dispatcher_write (session->dispather, outbuf,
+                                                       r, FALSE, TRUE);
                                }
                                else {
-                                       if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) {
-                                               if (!is_redis) {
-                                                       return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF,
-                                                                       sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE);
-                                               }
-                                               else {
-                                                       return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
-                                                                       sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
-                                               }
-                                       }
-                                       else {
-                                               if (!is_redis) {
-                                                       return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF,
-                                                                       sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE);
-                                               }
-                                               else {
-                                                       return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF,
-                                                                       sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE);
-                                               }
-                                       }
+                                       session->argnum ++;
+                                       session->state = KVSTORAGE_STATE_READ_ARGLEN;
+                                       rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
                                }
-                               g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
                        }
-                       else if (session->command == KVSTORAGE_CMD_SELECT) {
-                               if (!is_redis) {
-                                       return rspamd_dispatcher_write (session->dispather, "SELECTED" CRLF,
-                                                                                               sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE);
+               }
+               else if (session->argnum == 1) {
+                       if (session->command != KVSTORAGE_CMD_SELECT) {
+                               /* This argument is a key for normal command */
+                               session->key = memory_pool_fstrdup (session->pool, in);
+                               if (session->argnum == session->argc - 1) {
+                                       rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+                                       return kvstorage_process_command (session, TRUE);
                                }
                                else {
-                                       return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
-                                                       sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+                                       session->argnum ++;
+                                       session->state = KVSTORAGE_STATE_READ_ARGLEN;
+                                       rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
                                }
                        }
-                       else if (session->command == KVSTORAGE_CMD_QUIT) {
-                               /* Quit session */
-                               free_kvstorage_session (session);
-                               return FALSE;
+                       else {
+                               /* Special case for select command */
+                               rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf), in->len));
+                               session->id = strtoul (outbuf, NULL, 10);
+                               rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+                               return kvstorage_process_command (session, TRUE);
+                       }
+               }
+               else if (session->argnum == 2) {
+                       /* We get datablock for set command */
+                       session->state = KVSTORAGE_STATE_READ_CMD;
+                       rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+                       g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+                       if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len,
+                                       session->flags, session->expire)) {
+                               g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+                               return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+                                               sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+                       }
+                       else {
+                               g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+                               return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
+                                               sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
                        }
                }
                break;
@@ -556,8 +746,14 @@ kvstorage_read_socket (f_str_t * in, void *arg)
                }
                else {
                        g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
-                       return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED,
-                                                                       sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE);
+                       if (!is_redis) {
+                               return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED,
+                                               sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE);
+                       }
+                       else {
+                               return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
+                                               sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
+                       }
                }
 
                break;
index 2d4a3fa891fdb7c615bc575e2414eaf59e0a700f..a7c8bd8a6327f236b74bc0a7ed7de21bc2bf32b1 100644 (file)
@@ -59,6 +59,8 @@ struct kvstorage_session {
        rspamd_io_dispatcher_t *dispather;
        enum {
                KVSTORAGE_STATE_READ_CMD,
+               KVSTORAGE_STATE_READ_ARGLEN,
+               KVSTORAGE_STATE_READ_ARG,
                KVSTORAGE_STATE_READ_DATA
        } state;
        enum {
@@ -70,6 +72,8 @@ struct kvstorage_session {
                KVSTORAGE_CMD_QUIT
        } command;
        guint id;
+       guint argc;
+       guint argnum;
        memory_pool_t *pool;
        gchar *key;
        struct kvstorage_config *cf;