diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-11-10 21:03:04 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-11-10 21:03:04 +0300 |
commit | e31d2fac565ae6f32cb5283eb55392344364b453 (patch) | |
tree | afc154180d958c7df69ce4fd3783364065f5500f /src/kvstorage_server.c | |
parent | 4e1470d50f1ece3277dec6c770c5ea74d0db10b7 (diff) | |
download | rspamd-e31d2fac565ae6f32cb5283eb55392344364b453.tar.gz rspamd-e31d2fac565ae6f32cb5283eb55392344364b453.zip |
* Add incr and decr commands to kvstorage.
* Add integers detection inside kvstorage.
Diffstat (limited to 'src/kvstorage_server.c')
-rw-r--r-- | src/kvstorage_server.c | 196 |
1 files changed, 166 insertions, 30 deletions
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 1b542458b..dd0c18b57 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -156,15 +156,52 @@ parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len) } } else if (len == 4) { - if (g_ascii_strncasecmp (c, "quit", 4) == 0) { + if ((c[0] == 'i' || c[0] == 'I') && + (c[1] == 'n' || c[1] == 'N') && + (c[2] == 'c' || c[2] == 'C') && + (c[3] == 'r' || c[3] == 'R')) { + session->command = KVSTORAGE_CMD_INCR; + session->arg_data.value = 1; + } + else if ((c[0] == 'd' || c[0] == 'D') && + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'c' || c[2] == 'C') && + (c[3] == 'r' || c[3] == 'R')) { + session->command = KVSTORAGE_CMD_DECR; + session->arg_data.value = -1; + } + else if (g_ascii_strncasecmp (c, "quit", 4) == 0) { session->command = KVSTORAGE_CMD_QUIT; } - if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) { + else if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) { session->command = KVSTORAGE_CMD_SYNC; } } else if (len == 6) { - if (g_ascii_strncasecmp (c, "delete", 6) == 0) { + if ((c[0] == 'i' || c[0] == 'I') && + (c[1] == 'n' || c[1] == 'N') && + (c[2] == 'c' || c[2] == 'C') && + (c[3] == 'r' || c[3] == 'R') && + (c[4] == 'b' || c[4] == 'B') && + (c[5] == 'y' || c[5] == 'Y')) { + session->command = KVSTORAGE_CMD_INCR; + session->arg_data.value = 1; + } + else if ((c[0] == 'd' || c[0] == 'D') && + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'c' || c[2] == 'C') && + (c[3] == 'r' || c[3] == 'R') && + (c[4] == 'b' || c[4] == 'B') && + (c[5] == 'y' || c[5] == 'Y')) { + session->command = KVSTORAGE_CMD_DECR; + session->arg_data.value = -1; + } + else if ((c[0] == 'd' || c[0] == 'D') && + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'l' || c[2] == 'L') && + (c[3] == 'e' || c[3] == 'E') && + (c[4] == 't' || c[4] == 'T') && + (c[5] == 'e' || c[5] == 'E')) { session->command = KVSTORAGE_CMD_DELETE; } else if (g_ascii_strncasecmp (c, "select", 6) == 0) { @@ -271,7 +308,9 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) session->key = memory_pool_alloc (session->pool, p - c + 1); rspamd_strlcpy (session->key, c, p - c + 1); /* Now we must select next state based on command */ - if (session->command == KVSTORAGE_CMD_SET) { + if (session->command == KVSTORAGE_CMD_SET || + session->command == KVSTORAGE_CMD_INCR || + session->command == KVSTORAGE_CMD_DECR) { /* Read flags */ state = 99; if (is_redis) { @@ -280,7 +319,12 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) session->expire = 0; } else { - next_state = 3; + if (session->command == KVSTORAGE_CMD_SET) { + next_state = 3; + } + else { + next_state = 5; + } } } else { @@ -299,7 +343,13 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) if (g_ascii_isspace (*p)) { session->flags = strtoul (c, NULL, 10); state = 99; - next_state = 4; + if (session->command == KVSTORAGE_CMD_SET) { + next_state = 4; + } + else { + /* INCR and DECR */ + next_state = 5; + } } else { return FALSE; @@ -323,13 +373,29 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in) } break; case 5: - /* Read size */ + /* Read size or incr/decr values */ if (g_ascii_isdigit (*p)) { p ++; } else { - if (g_ascii_isspace (*p) || end == p) { - session->length = strtoul (c, NULL, 10); + if (g_ascii_isspace (*p) || p >= end - 1) { + if (session->command == KVSTORAGE_CMD_SET) { + session->arg_data.length = strtoul (c, NULL, 10); + } + else { + if (p != c) { + session->arg_data.value = strtoul (c, NULL, 10); + if (session->command == KVSTORAGE_CMD_DECR) { + session->arg_data.value = -session->arg_data.value; + } + } + else if (session->command == KVSTORAGE_CMD_INCR) { + session->arg_data.value = 1; + } + else { + session->arg_data.value = -1; + } + } state = 100; } else { @@ -395,13 +461,15 @@ static gboolean kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) { gint r; - gchar outbuf[BUFSIZ]; + gchar outbuf[BUFSIZ], intbuf[sizeof ("9223372036854775807")]; gboolean res; struct rspamd_kv_element *elt; + guint eltlen; + glong longval; if (session->command == KVSTORAGE_CMD_SET) { session->state = KVSTORAGE_STATE_READ_DATA; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->length); + rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->arg_data.length); } else if (session->command == KVSTORAGE_CMD_GET) { g_static_rw_lock_reader_lock (&session->cf->storage->rwlock); @@ -419,23 +487,38 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } else { session->elt = elt; + if (elt->flags & KV_ELT_INTEGER) { + eltlen = rspamd_snprintf (intbuf, sizeof (intbuf), "%l", ELT_LONG (elt)); + + } + else { + eltlen = elt->size; + } if (!is_redis) { r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF, - ELT_KEY (elt), elt->flags, elt->size); + ELT_KEY (elt), elt->flags, eltlen); } else { r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF, - elt->size); + eltlen); } if (!rspamd_dispatcher_write (session->dispather, outbuf, r, TRUE, FALSE)) { g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); return FALSE; } - if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) { - g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); - return FALSE; + if (elt->flags & KV_ELT_INTEGER) { + if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) { + g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + return FALSE; + } + } + else { + if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) { + g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); + return FALSE; + } } if (!is_redis) { res = rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF, @@ -482,6 +565,36 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis) } } } + else if (session->command == KVSTORAGE_CMD_INCR || session->command == KVSTORAGE_CMD_DECR) { + g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); + longval = session->arg_data.value; + if (!rspamd_kv_storage_increment (session->cf->storage, session->key, &longval)) { + g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "-ERR not found" CRLF, + sizeof ("-ERR not found" CRLF) - 1, FALSE, TRUE); + } + } + else { + g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); + if (!is_redis) { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "%l" CRLF, + longval); + } + else { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%l" CRLF, + longval); + } + if (!rspamd_dispatcher_write (session->dispather, outbuf, + r, FALSE, FALSE)) { + return FALSE; + } + } + } else if (session->command == KVSTORAGE_CMD_SYNC) { if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) { if (!is_redis) { @@ -586,6 +699,9 @@ kvstorage_check_argnum (struct kvstorage_session *session) return session->argc == 1; case KVSTORAGE_CMD_SET: return session->argc == 3; + case KVSTORAGE_CMD_INCR: + case KVSTORAGE_CMD_DECR: + return session->argc == 1 || session->argc == 2; default: return session->argc == 2; } @@ -682,9 +798,16 @@ kvstorage_read_socket (f_str_t * in, void *arg) r, FALSE, TRUE); } else { - session->argnum ++; - session->state = KVSTORAGE_STATE_READ_ARGLEN; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + if (session->argnum == session->argc - 1) { + session->state = KVSTORAGE_STATE_READ_CMD; + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + return kvstorage_process_command (session, TRUE); + } + else { + session->argnum ++; + session->state = KVSTORAGE_STATE_READ_ARGLEN; + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + } } } } @@ -693,6 +816,7 @@ kvstorage_read_socket (f_str_t * in, void *arg) /* This argument is a key for normal command */ session->key = memory_pool_fstrdup (session->pool, in); if (session->argnum == session->argc - 1) { + session->state = KVSTORAGE_STATE_READ_CMD; rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); return kvstorage_process_command (session, TRUE); } @@ -704,6 +828,7 @@ kvstorage_read_socket (f_str_t * in, void *arg) } else { /* Special case for select command */ + session->state = KVSTORAGE_STATE_READ_CMD; rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf), in->len)); session->id = strtoul (outbuf, NULL, 10); rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); @@ -712,19 +837,30 @@ kvstorage_read_socket (f_str_t * in, void *arg) } else if (session->argnum == 2) { /* We get datablock for set command */ - session->state = KVSTORAGE_STATE_READ_CMD; - rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); - g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); - if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len, - session->flags, session->expire)) { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); - return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, - sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + if (session->command == KVSTORAGE_CMD_SET) { + session->state = KVSTORAGE_STATE_READ_CMD; + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + g_static_rw_lock_writer_lock (&session->cf->storage->rwlock); + if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len, + session->flags, session->expire)) { + g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); + return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + } + else { + g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); + return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, + sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); + } } else { - g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); - return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF, - sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE); + session->state = KVSTORAGE_STATE_READ_CMD; + rspamd_strtol (in->begin, in->len, &session->arg_data.value); + if (session->command == KVSTORAGE_CMD_DECR) { + session->arg_data.value = -session->arg_data.value; + } + rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1); + return kvstorage_process_command (session, TRUE); } } break; |