From: Vsevolod Stakhov Date: Wed, 9 Nov 2011 16:30:57 +0000 (+0300) Subject: * Support redis unified protocol. X-Git-Tag: 0.4.5~12 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=4e1470d50f1ece3277dec6c770c5ea74d0db10b7;p=rspamd.git * Support redis unified protocol. --- diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index a5361057f..1b542458b 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -127,11 +127,62 @@ free_kvstorage_session (struct kvstorage_session *session) g_slice_free1 (sizeof (struct kvstorage_session), session); } -/** +/* * Parse kvstorage command */ static gboolean -parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in) +parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len) +{ + if (len == 3) { + /* Set or get command */ + if ((c[0] == 'g' || c[0] == 'G') && + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 't' || c[2] == 'T')) { + session->command = KVSTORAGE_CMD_GET; + } + else if ((c[0] == 's' || c[0] == 'S') && + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 't' || c[2] == 'T')) { + session->command = KVSTORAGE_CMD_SET; + } + else if ((c[0] == 'd' || c[0] == 'D') && + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'l' || c[2] == 'L')) { + session->command = KVSTORAGE_CMD_DELETE; + } + else { + /* Error */ + return FALSE; + } + } + else if (len == 4) { + if (g_ascii_strncasecmp (c, "quit", 4) == 0) { + session->command = KVSTORAGE_CMD_QUIT; + } + if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) { + session->command = KVSTORAGE_CMD_SYNC; + } + } + else if (len == 6) { + if (g_ascii_strncasecmp (c, "delete", 6) == 0) { + session->command = KVSTORAGE_CMD_DELETE; + } + else if (g_ascii_strncasecmp (c, "select", 6) == 0) { + session->command = KVSTORAGE_CMD_SELECT; + } + else { + return FALSE; + } + } + + return TRUE; +} + +/** + * Parse kvstorage line + */ +static gboolean +parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) { gchar *p, *c, *end; gint state = 0, next_state; @@ -158,9 +209,17 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in) next_state = 1; } else if (c == p) { - /* We have some character, so assume id as 0 and parse command */ - session->id = 0; - state = 1; + if (*p != '*') { + /* We have some character, so assume id as 0 and parse command */ + session->id = 0; + state = 1; + } + else { + /* In fact it is redis number of commands */ + c = ++p; + state = 7; + session->id = 0; + } } else { /* We have something wrong here (like some digits and then come non-digits) */ @@ -174,62 +233,24 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in) p ++; } else { - if ((g_ascii_isspace (*p) || p == end) && p != c) { - /* We got some command, try to parse it */ - if (p - c == 3) { - /* Set or get command */ - if ((c[0] == 'g' || c[0] == 'G') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 't' || c[2] == 'T')) { - session->command = KVSTORAGE_CMD_GET; - } - else if ((c[0] == 's' || c[0] == 'S') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 't' || c[2] == 'T')) { - session->command = KVSTORAGE_CMD_SET; - } - else if ((c[0] == 'd' || c[0] == 'D') && - (c[1] == 'e' || c[1] == 'E') && - (c[2] == 'l' || c[2] == 'L')) { - session->command = KVSTORAGE_CMD_DELETE; - } - else { - /* Error */ - return FALSE; - } - } - else if (p - c == 4) { - if (g_ascii_strncasecmp (c, "quit", 4) == 0) { - session->command = KVSTORAGE_CMD_QUIT; - state = 100; - continue; - } - if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) { - session->command = KVSTORAGE_CMD_SYNC; - state = 100; - continue; - } - } - else if (p - c == 6) { - if (g_ascii_strncasecmp (c, "delete", 6) == 0) { - session->command = KVSTORAGE_CMD_DELETE; - } - else if (g_ascii_strncasecmp (c, "select", 6) == 0) { - session->command = KVSTORAGE_CMD_SELECT; - state = 99; - next_state = 6; - continue; - } - else { - return FALSE; - } - } - else { - return FALSE; + if (parse_kvstorage_command (session, c, p - c)) { + switch (session->command) { + + case KVSTORAGE_CMD_QUIT: + case KVSTORAGE_CMD_SYNC: + /* Single argument command */ + state = 100; + break; + case KVSTORAGE_CMD_SELECT: + /* Select command, read id next */ + state = 99; + next_state = 6; + break; + default: + /* Normal command, read key */ + state = 99; + next_state = 2; } - /* Skip spaces and try to parse key */ - state = 99; - next_state = 2; } else { /* Some error */ @@ -331,6 +352,24 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in) } } break; + case 7: + /* Read arguments count */ + if (g_ascii_isdigit (*p)) { + p ++; + } + else { + if (g_ascii_isspace (*p) || end == p) { + session->argc = strtoul (c, NULL, 10); + session->argnum = 0; + state = 100; + /* Switch to arglen state */ + session->state = KVSTORAGE_STATE_READ_ARGLEN; + } + else { + return FALSE; + } + } + break; case 99: /* Skip spaces state */ if (g_ascii_isspace (*p)) { @@ -351,9 +390,214 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in) return state == 100; } +/* Process normal kvstorage command */ +static gboolean +kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) +{ + gint r; + gchar outbuf[BUFSIZ]; + gboolean res; + struct rspamd_kv_element *elt; + + if (session->command == KVSTORAGE_CMD_SET) { + session->state = KVSTORAGE_STATE_READ_DATA; + rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->length); + } + else if (session->command == KVSTORAGE_CMD_GET) { + g_static_rw_lock_reader_lock (&session->cf->storage->rwlock); + elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now); + if (elt == NULL) { + g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "$-1" CRLF, + sizeof ("$-1" CRLF) - 1, FALSE, TRUE); + } + } + else { + session->elt = elt; + + if (!is_redis) { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF, + ELT_KEY (elt), elt->flags, elt->size); + } + else { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF, + elt->size); + } + if (!rspamd_dispatcher_write (session->dispather, outbuf, + r, TRUE, FALSE)) { + g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + return FALSE; + } + if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) { + g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + return FALSE; + } + if (!is_redis) { + res = rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF, + sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE); + } + else { + res = rspamd_dispatcher_write (session->dispather, CRLF, + sizeof (CRLF) - 1, FALSE, TRUE); + } + if (!res) { + g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + } + + return res; + } + } + else if (session->command == KVSTORAGE_CMD_DELETE) { + g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); + elt = rspamd_kv_storage_delete (session->cf->storage, session->key); + if (elt != NULL) { + if ((elt->flags & KV_ELT_DIRTY) == 0) { + /* Free memory if backend has deleted this element */ + g_slice_free1 (ELT_SIZE (elt), elt); + } + g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF, + sizeof ("DELETED" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, ":1" CRLF, + sizeof (":1" CRLF) - 1, FALSE, TRUE); + } + } + else { + g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, ":0" CRLF, + sizeof (":0" CRLF) - 1, FALSE, TRUE); + } + } + } + else if (session->command == KVSTORAGE_CMD_SYNC) { + if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) { + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, ERROR_COMMON, + sizeof (ERROR_COMMON) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF, + sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE); + } + } + else { + if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) { + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF, + sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + } + } + else { + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF, + sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF, + sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE); + } + } + } + g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); + } + else if (session->command == KVSTORAGE_CMD_SELECT) { + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, "SELECTED" CRLF, + sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + } + } + else if (session->command == KVSTORAGE_CMD_QUIT) { + /* Quit session */ + free_kvstorage_session (session); + return FALSE; + } + + return TRUE; +} + +static gboolean +kvstorage_read_arglen (f_str_t *in, guint *len) +{ + gchar *p = in->begin, *end = in->begin + in->len, *c; + gint state = 0; + + while (p < end) { + switch (state) { + case 0: + if (*p != '$') { + return FALSE; + } + else { + p ++; + c = p; + state = 1; + } + break; + case 1: + if (g_ascii_isdigit (*p) && p != end - 1) { + p ++; + } + else { + if (p != end - 1) { + return FALSE; + } + else { + *len = strtoul (c, NULL, 10); + return TRUE; + } + } + break; + } + } + + return TRUE; +} + +/* + * Check number of arguments for a command + */ +static gboolean +kvstorage_check_argnum (struct kvstorage_session *session) +{ + switch (session->command) { + case KVSTORAGE_CMD_QUIT: + case KVSTORAGE_CMD_SYNC: + return session->argc == 1; + case KVSTORAGE_CMD_SET: + return session->argc == 3; + default: + return session->argc == 2; + } + + /* Unreachable */ + return FALSE; +} + /** * Dispatcher callbacks */ + /* * Callback that is called when there is data to read in buffer */ @@ -362,10 +606,10 @@ kvstorage_read_socket (f_str_t * in, void *arg) { struct kvstorage_session *session = (struct kvstorage_session *) arg; struct kvstorage_worker_thread *thr; - struct rspamd_kv_element *elt; gint r; + guint arglen; gchar outbuf[BUFSIZ]; - gboolean is_redis, res; + gboolean is_redis; if (in->len == 0) { /* Skip empty commands */ @@ -378,7 +622,7 @@ kvstorage_read_socket (f_str_t * in, void *arg) case KVSTORAGE_STATE_READ_CMD: /* Update timestamp */ session->now = time (NULL); - if (! parse_kvstorage_command (session, in)) { + if (! parse_kvstorage_line (session, in)) { thr_info ("%ud: unknown command: %V", thr->id, in); if (!is_redis) { return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND, @@ -403,138 +647,84 @@ kvstorage_read_socket (f_str_t * in, void *arg) sizeof ("-ERR unknown keystorage" CRLF) - 1, FALSE, TRUE); } } - if (session->command == KVSTORAGE_CMD_SET) { - session->state = KVSTORAGE_STATE_READ_DATA; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->length); - } - else if (session->command == KVSTORAGE_CMD_GET) { - g_static_rw_lock_reader_lock (&session->cf->storage->rwlock); - elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now); - if (elt == NULL) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); - if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, - sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); - } - else { - return rspamd_dispatcher_write (session->dispather, "$-1" CRLF, - sizeof ("$-1" CRLF) - 1, FALSE, TRUE); - } - } - else { - session->elt = elt; - - if (!is_redis) { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF, - ELT_KEY (elt), elt->flags, elt->size); - } - else { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF, - elt->size); - } - if (!rspamd_dispatcher_write (session->dispather, outbuf, - r, TRUE, FALSE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); - return FALSE; - } - if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); - return FALSE; - } - if (!is_redis) { - res = rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF, - sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE); - } - else { - res = rspamd_dispatcher_write (session->dispather, CRLF, - sizeof (CRLF) - 1, FALSE, TRUE); - } - if (!res) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); - } - - return res; - } + if (session->state != KVSTORAGE_STATE_READ_ARGLEN) { + return kvstorage_process_command (session, is_redis); } - else if (session->command == KVSTORAGE_CMD_DELETE) { - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); - elt = rspamd_kv_storage_delete (session->cf->storage, session->key); - if (elt != NULL) { - if ((elt->flags & KV_ELT_DIRTY) == 0) { - /* Free memory if backend has deleted this element */ - g_slice_free1 (ELT_SIZE (elt), elt); - } - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); - if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF, - sizeof ("DELETED" CRLF) - 1, FALSE, TRUE); - } - else { - return rspamd_dispatcher_write (session->dispather, ":1" CRLF, - sizeof (":1" CRLF) - 1, FALSE, TRUE); - } - } - else { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); - if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, - sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); - } - else { - return rspamd_dispatcher_write (session->dispather, ":0" CRLF, - sizeof (":0" CRLF) - 1, FALSE, TRUE); - } - } + } + break; + case KVSTORAGE_STATE_READ_ARGLEN: + if (! kvstorage_read_arglen (in, &arglen)) { + session->state = KVSTORAGE_STATE_READ_CMD; + r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown arglen '%V'" CRLF, in); + return rspamd_dispatcher_write (session->dispather, outbuf, + r, FALSE, TRUE); + } + else { + session->state = KVSTORAGE_STATE_READ_ARG; + rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, arglen); + } + break; + case KVSTORAGE_STATE_READ_ARG: + if (session->argnum == 0) { + /* Read command */ + if (! parse_kvstorage_command (session, in->begin, in->len)) { + session->state = KVSTORAGE_STATE_READ_CMD; + r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in); + return rspamd_dispatcher_write (session->dispather, outbuf, + r, FALSE, TRUE); } - else if (session->command == KVSTORAGE_CMD_SYNC) { - if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) { - if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, ERROR_COMMON, - sizeof (ERROR_COMMON) - 1, FALSE, TRUE); - } - else { - return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF, - sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE); - } + else { + if (! kvstorage_check_argnum (session)) { + session->state = KVSTORAGE_STATE_READ_CMD; + r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR invalid argnum for command '%V': %ud" CRLF, + in, session->argc); + return rspamd_dispatcher_write (session->dispather, outbuf, + r, FALSE, TRUE); } else { - if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) { - if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF, - sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE); - } - else { - return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, FALSE, TRUE); - } - } - else { - if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF, - sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE); - } - else { - return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF, - sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE); - } - } + session->argnum ++; + session->state = KVSTORAGE_STATE_READ_ARGLEN; + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); } - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); } - else if (session->command == KVSTORAGE_CMD_SELECT) { - if (!is_redis) { - return rspamd_dispatcher_write (session->dispather, "SELECTED" CRLF, - sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE); + } + else if (session->argnum == 1) { + if (session->command != KVSTORAGE_CMD_SELECT) { + /* This argument is a key for normal command */ + session->key = memory_pool_fstrdup (session->pool, in); + if (session->argnum == session->argc - 1) { + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + return kvstorage_process_command (session, TRUE); } else { - return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + session->argnum ++; + session->state = KVSTORAGE_STATE_READ_ARGLEN; + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); } } - else if (session->command == KVSTORAGE_CMD_QUIT) { - /* Quit session */ - free_kvstorage_session (session); - return FALSE; + else { + /* Special case for select command */ + rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf), in->len)); + session->id = strtoul (outbuf, NULL, 10); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + return kvstorage_process_command (session, TRUE); + } + } + else if (session->argnum == 2) { + /* We get datablock for set command */ + session->state = KVSTORAGE_STATE_READ_CMD; + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); + if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len, + session->flags, session->expire)) { + g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); + return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + } + else { + g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); + return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, + sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); } } break; @@ -556,8 +746,14 @@ kvstorage_read_socket (f_str_t * in, void *arg) } else { g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); - return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED, - sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE); + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED, + sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, + sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); + } } break; diff --git a/src/kvstorage_server.h b/src/kvstorage_server.h index 2d4a3fa89..a7c8bd8a6 100644 --- a/src/kvstorage_server.h +++ b/src/kvstorage_server.h @@ -59,6 +59,8 @@ struct kvstorage_session { rspamd_io_dispatcher_t *dispather; enum { KVSTORAGE_STATE_READ_CMD, + KVSTORAGE_STATE_READ_ARGLEN, + KVSTORAGE_STATE_READ_ARG, KVSTORAGE_STATE_READ_DATA } state; enum { @@ -70,6 +72,8 @@ struct kvstorage_session { KVSTORAGE_CMD_QUIT } command; guint id; + guint argc; + guint argnum; memory_pool_t *pool; gchar *key; struct kvstorage_config *cf;