aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-09 19:30:57 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-09 19:30:57 +0300
commit4e1470d50f1ece3277dec6c770c5ea74d0db10b7 (patch)
tree63e61d8a721769e0233fe860e204839c08eed91a
parentcf21ad184448908536c32495db26f97bffd3f584 (diff)
downloadrspamd-4e1470d50f1ece3277dec6c770c5ea74d0db10b7.tar.gz
rspamd-4e1470d50f1ece3277dec6c770c5ea74d0db10b7.zip
* Support redis unified protocol.
-rw-r--r--src/kvstorage_server.c570
-rw-r--r--src/kvstorage_server.h4
2 files changed, 387 insertions, 187 deletions
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index a5361057f..1b542458b 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -127,11 +127,62 @@ free_kvstorage_session (struct kvstorage_session *session)
g_slice_free1 (sizeof (struct kvstorage_session), session);
}
-/**
+/*
* Parse kvstorage command
*/
static gboolean
-parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
+parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len)
+{
+ if (len == 3) {
+ /* Set or get command */
+ if ((c[0] == 'g' || c[0] == 'G') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 't' || c[2] == 'T')) {
+ session->command = KVSTORAGE_CMD_GET;
+ }
+ else if ((c[0] == 's' || c[0] == 'S') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 't' || c[2] == 'T')) {
+ session->command = KVSTORAGE_CMD_SET;
+ }
+ else if ((c[0] == 'd' || c[0] == 'D') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'l' || c[2] == 'L')) {
+ session->command = KVSTORAGE_CMD_DELETE;
+ }
+ else {
+ /* Error */
+ return FALSE;
+ }
+ }
+ else if (len == 4) {
+ if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
+ session->command = KVSTORAGE_CMD_QUIT;
+ }
+ if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) {
+ session->command = KVSTORAGE_CMD_SYNC;
+ }
+ }
+ else if (len == 6) {
+ if (g_ascii_strncasecmp (c, "delete", 6) == 0) {
+ session->command = KVSTORAGE_CMD_DELETE;
+ }
+ else if (g_ascii_strncasecmp (c, "select", 6) == 0) {
+ session->command = KVSTORAGE_CMD_SELECT;
+ }
+ else {
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+}
+
+/**
+ * Parse kvstorage line
+ */
+static gboolean
+parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
{
gchar *p, *c, *end;
gint state = 0, next_state;
@@ -158,9 +209,17 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
next_state = 1;
}
else if (c == p) {
- /* We have some character, so assume id as 0 and parse command */
- session->id = 0;
- state = 1;
+ if (*p != '*') {
+ /* We have some character, so assume id as 0 and parse command */
+ session->id = 0;
+ state = 1;
+ }
+ else {
+ /* In fact it is redis number of commands */
+ c = ++p;
+ state = 7;
+ session->id = 0;
+ }
}
else {
/* We have something wrong here (like some digits and then come non-digits) */
@@ -174,62 +233,24 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
p ++;
}
else {
- if ((g_ascii_isspace (*p) || p == end) && p != c) {
- /* We got some command, try to parse it */
- if (p - c == 3) {
- /* Set or get command */
- if ((c[0] == 'g' || c[0] == 'G') &&
- (c[1] == 'e' || c[1] == 'E') &&
- (c[2] == 't' || c[2] == 'T')) {
- session->command = KVSTORAGE_CMD_GET;
- }
- else if ((c[0] == 's' || c[0] == 'S') &&
- (c[1] == 'e' || c[1] == 'E') &&
- (c[2] == 't' || c[2] == 'T')) {
- session->command = KVSTORAGE_CMD_SET;
- }
- else if ((c[0] == 'd' || c[0] == 'D') &&
- (c[1] == 'e' || c[1] == 'E') &&
- (c[2] == 'l' || c[2] == 'L')) {
- session->command = KVSTORAGE_CMD_DELETE;
- }
- else {
- /* Error */
- return FALSE;
- }
- }
- else if (p - c == 4) {
- if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
- session->command = KVSTORAGE_CMD_QUIT;
- state = 100;
- continue;
- }
- if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) {
- session->command = KVSTORAGE_CMD_SYNC;
- state = 100;
- continue;
- }
- }
- else if (p - c == 6) {
- if (g_ascii_strncasecmp (c, "delete", 6) == 0) {
- session->command = KVSTORAGE_CMD_DELETE;
- }
- else if (g_ascii_strncasecmp (c, "select", 6) == 0) {
- session->command = KVSTORAGE_CMD_SELECT;
- state = 99;
- next_state = 6;
- continue;
- }
- else {
- return FALSE;
- }
- }
- else {
- return FALSE;
+ if (parse_kvstorage_command (session, c, p - c)) {
+ switch (session->command) {
+
+ case KVSTORAGE_CMD_QUIT:
+ case KVSTORAGE_CMD_SYNC:
+ /* Single argument command */
+ state = 100;
+ break;
+ case KVSTORAGE_CMD_SELECT:
+ /* Select command, read id next */
+ state = 99;
+ next_state = 6;
+ break;
+ default:
+ /* Normal command, read key */
+ state = 99;
+ next_state = 2;
}
- /* Skip spaces and try to parse key */
- state = 99;
- next_state = 2;
}
else {
/* Some error */
@@ -331,6 +352,24 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
}
}
break;
+ case 7:
+ /* Read arguments count */
+ if (g_ascii_isdigit (*p)) {
+ p ++;
+ }
+ else {
+ if (g_ascii_isspace (*p) || end == p) {
+ session->argc = strtoul (c, NULL, 10);
+ session->argnum = 0;
+ state = 100;
+ /* Switch to arglen state */
+ session->state = KVSTORAGE_STATE_READ_ARGLEN;
+ }
+ else {
+ return FALSE;
+ }
+ }
+ break;
case 99:
/* Skip spaces state */
if (g_ascii_isspace (*p)) {
@@ -351,9 +390,214 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
return state == 100;
}
+/* Process normal kvstorage command */
+static gboolean
+kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
+{
+ gint r;
+ gchar outbuf[BUFSIZ];
+ gboolean res;
+ struct rspamd_kv_element *elt;
+
+ if (session->command == KVSTORAGE_CMD_SET) {
+ session->state = KVSTORAGE_STATE_READ_DATA;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->length);
+ }
+ else if (session->command == KVSTORAGE_CMD_GET) {
+ g_static_rw_lock_reader_lock (&session->cf->storage->rwlock);
+ elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now);
+ if (elt == NULL) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "$-1" CRLF,
+ sizeof ("$-1" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ else {
+ session->elt = elt;
+
+ if (!is_redis) {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
+ ELT_KEY (elt), elt->flags, elt->size);
+ }
+ else {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF,
+ elt->size);
+ }
+ if (!rspamd_dispatcher_write (session->dispather, outbuf,
+ r, TRUE, FALSE)) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ return FALSE;
+ }
+ if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ return FALSE;
+ }
+ if (!is_redis) {
+ res = rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
+ sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ res = rspamd_dispatcher_write (session->dispather, CRLF,
+ sizeof (CRLF) - 1, FALSE, TRUE);
+ }
+ if (!res) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ }
+
+ return res;
+ }
+ }
+ else if (session->command == KVSTORAGE_CMD_DELETE) {
+ g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+ elt = rspamd_kv_storage_delete (session->cf->storage, session->key);
+ if (elt != NULL) {
+ if ((elt->flags & KV_ELT_DIRTY) == 0) {
+ /* Free memory if backend has deleted this element */
+ g_slice_free1 (ELT_SIZE (elt), elt);
+ }
+ g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
+ sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, ":1" CRLF,
+ sizeof (":1" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ else {
+ g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, ":0" CRLF,
+ sizeof (":0" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ }
+ else if (session->command == KVSTORAGE_CMD_SYNC) {
+ if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) {
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_COMMON,
+ sizeof (ERROR_COMMON) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF,
+ sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ else {
+ if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) {
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF,
+ sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ else {
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF,
+ sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF,
+ sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ }
+ g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+ }
+ else if (session->command == KVSTORAGE_CMD_SELECT) {
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, "SELECTED" CRLF,
+ sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ else if (session->command == KVSTORAGE_CMD_QUIT) {
+ /* Quit session */
+ free_kvstorage_session (session);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static gboolean
+kvstorage_read_arglen (f_str_t *in, guint *len)
+{
+ gchar *p = in->begin, *end = in->begin + in->len, *c;
+ gint state = 0;
+
+ while (p < end) {
+ switch (state) {
+ case 0:
+ if (*p != '$') {
+ return FALSE;
+ }
+ else {
+ p ++;
+ c = p;
+ state = 1;
+ }
+ break;
+ case 1:
+ if (g_ascii_isdigit (*p) && p != end - 1) {
+ p ++;
+ }
+ else {
+ if (p != end - 1) {
+ return FALSE;
+ }
+ else {
+ *len = strtoul (c, NULL, 10);
+ return TRUE;
+ }
+ }
+ break;
+ }
+ }
+
+ return TRUE;
+}
+
+/*
+ * Check number of arguments for a command
+ */
+static gboolean
+kvstorage_check_argnum (struct kvstorage_session *session)
+{
+ switch (session->command) {
+ case KVSTORAGE_CMD_QUIT:
+ case KVSTORAGE_CMD_SYNC:
+ return session->argc == 1;
+ case KVSTORAGE_CMD_SET:
+ return session->argc == 3;
+ default:
+ return session->argc == 2;
+ }
+
+ /* Unreachable */
+ return FALSE;
+}
+
/**
* Dispatcher callbacks
*/
+
/*
* Callback that is called when there is data to read in buffer
*/
@@ -362,10 +606,10 @@ kvstorage_read_socket (f_str_t * in, void *arg)
{
struct kvstorage_session *session = (struct kvstorage_session *) arg;
struct kvstorage_worker_thread *thr;
- struct rspamd_kv_element *elt;
gint r;
+ guint arglen;
gchar outbuf[BUFSIZ];
- gboolean is_redis, res;
+ gboolean is_redis;
if (in->len == 0) {
/* Skip empty commands */
@@ -378,7 +622,7 @@ kvstorage_read_socket (f_str_t * in, void *arg)
case KVSTORAGE_STATE_READ_CMD:
/* Update timestamp */
session->now = time (NULL);
- if (! parse_kvstorage_command (session, in)) {
+ if (! parse_kvstorage_line (session, in)) {
thr_info ("%ud: unknown command: %V", thr->id, in);
if (!is_redis) {
return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND,
@@ -403,138 +647,84 @@ kvstorage_read_socket (f_str_t * in, void *arg)
sizeof ("-ERR unknown keystorage" CRLF) - 1, FALSE, TRUE);
}
}
- if (session->command == KVSTORAGE_CMD_SET) {
- session->state = KVSTORAGE_STATE_READ_DATA;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->length);
- }
- else if (session->command == KVSTORAGE_CMD_GET) {
- g_static_rw_lock_reader_lock (&session->cf->storage->rwlock);
- elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now);
- if (elt == NULL) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
- if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
- sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
- }
- else {
- return rspamd_dispatcher_write (session->dispather, "$-1" CRLF,
- sizeof ("$-1" CRLF) - 1, FALSE, TRUE);
- }
- }
- else {
- session->elt = elt;
-
- if (!is_redis) {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
- ELT_KEY (elt), elt->flags, elt->size);
- }
- else {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF,
- elt->size);
- }
- if (!rspamd_dispatcher_write (session->dispather, outbuf,
- r, TRUE, FALSE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
- return FALSE;
- }
- if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
- return FALSE;
- }
- if (!is_redis) {
- res = rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
- sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
- }
- else {
- res = rspamd_dispatcher_write (session->dispather, CRLF,
- sizeof (CRLF) - 1, FALSE, TRUE);
- }
- if (!res) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
- }
-
- return res;
- }
+ if (session->state != KVSTORAGE_STATE_READ_ARGLEN) {
+ return kvstorage_process_command (session, is_redis);
}
- else if (session->command == KVSTORAGE_CMD_DELETE) {
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
- elt = rspamd_kv_storage_delete (session->cf->storage, session->key);
- if (elt != NULL) {
- if ((elt->flags & KV_ELT_DIRTY) == 0) {
- /* Free memory if backend has deleted this element */
- g_slice_free1 (ELT_SIZE (elt), elt);
- }
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
- sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
- }
- else {
- return rspamd_dispatcher_write (session->dispather, ":1" CRLF,
- sizeof (":1" CRLF) - 1, FALSE, TRUE);
- }
- }
- else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
- sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
- }
- else {
- return rspamd_dispatcher_write (session->dispather, ":0" CRLF,
- sizeof (":0" CRLF) - 1, FALSE, TRUE);
- }
- }
+ }
+ break;
+ case KVSTORAGE_STATE_READ_ARGLEN:
+ if (! kvstorage_read_arglen (in, &arglen)) {
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown arglen '%V'" CRLF, in);
+ return rspamd_dispatcher_write (session->dispather, outbuf,
+ r, FALSE, TRUE);
+ }
+ else {
+ session->state = KVSTORAGE_STATE_READ_ARG;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, arglen);
+ }
+ break;
+ case KVSTORAGE_STATE_READ_ARG:
+ if (session->argnum == 0) {
+ /* Read command */
+ if (! parse_kvstorage_command (session, in->begin, in->len)) {
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in);
+ return rspamd_dispatcher_write (session->dispather, outbuf,
+ r, FALSE, TRUE);
}
- else if (session->command == KVSTORAGE_CMD_SYNC) {
- if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) {
- if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, ERROR_COMMON,
- sizeof (ERROR_COMMON) - 1, FALSE, TRUE);
- }
- else {
- return rspamd_dispatcher_write (session->dispather, "-ERR unsupported" CRLF,
- sizeof ("-ERR unsupported" CRLF) - 1, FALSE, TRUE);
- }
+ else {
+ if (! kvstorage_check_argnum (session)) {
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR invalid argnum for command '%V': %ud" CRLF,
+ in, session->argc);
+ return rspamd_dispatcher_write (session->dispather, outbuf,
+ r, FALSE, TRUE);
}
else {
- if (session->cf->storage->backend->sync_func (session->cf->storage->backend)) {
- if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, "SYNCED" CRLF,
- sizeof ("SYNCED" CRLF) - 1, FALSE, TRUE);
- }
- else {
- return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
- sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
- }
- }
- else {
- if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, "NOT_SYNCED" CRLF,
- sizeof ("NOT_SYNCED" CRLF) - 1, FALSE, TRUE);
- }
- else {
- return rspamd_dispatcher_write (session->dispather, "-ERR not synced" CRLF,
- sizeof ("-ERR not synced" CRLF) - 1, FALSE, TRUE);
- }
- }
+ session->argnum ++;
+ session->state = KVSTORAGE_STATE_READ_ARGLEN;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
}
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
}
- else if (session->command == KVSTORAGE_CMD_SELECT) {
- if (!is_redis) {
- return rspamd_dispatcher_write (session->dispather, "SELECTED" CRLF,
- sizeof ("SELECTED" CRLF) - 1, FALSE, TRUE);
+ }
+ else if (session->argnum == 1) {
+ if (session->command != KVSTORAGE_CMD_SELECT) {
+ /* This argument is a key for normal command */
+ session->key = memory_pool_fstrdup (session->pool, in);
+ if (session->argnum == session->argc - 1) {
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ return kvstorage_process_command (session, TRUE);
}
else {
- return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
- sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ session->argnum ++;
+ session->state = KVSTORAGE_STATE_READ_ARGLEN;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
}
}
- else if (session->command == KVSTORAGE_CMD_QUIT) {
- /* Quit session */
- free_kvstorage_session (session);
- return FALSE;
+ else {
+ /* Special case for select command */
+ rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf), in->len));
+ session->id = strtoul (outbuf, NULL, 10);
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ return kvstorage_process_command (session, TRUE);
+ }
+ }
+ else if (session->argnum == 2) {
+ /* We get datablock for set command */
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+ if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len,
+ session->flags, session->expire)) {
+ g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+ return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+ return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
+ sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
}
}
break;
@@ -556,8 +746,14 @@ kvstorage_read_socket (f_str_t * in, void *arg)
}
else {
g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED,
- sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_NOT_STORED,
+ sizeof (ERROR_NOT_STORED) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
+ sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
+ }
}
break;
diff --git a/src/kvstorage_server.h b/src/kvstorage_server.h
index 2d4a3fa89..a7c8bd8a6 100644
--- a/src/kvstorage_server.h
+++ b/src/kvstorage_server.h
@@ -59,6 +59,8 @@ struct kvstorage_session {
rspamd_io_dispatcher_t *dispather;
enum {
KVSTORAGE_STATE_READ_CMD,
+ KVSTORAGE_STATE_READ_ARGLEN,
+ KVSTORAGE_STATE_READ_ARG,
KVSTORAGE_STATE_READ_DATA
} state;
enum {
@@ -70,6 +72,8 @@ struct kvstorage_session {
KVSTORAGE_CMD_QUIT
} command;
guint id;
+ guint argc;
+ guint argnum;
memory_pool_t *pool;
gchar *key;
struct kvstorage_config *cf;