aboutsummaryrefslogtreecommitdiffstats
path: root/src/kvstorage_server.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-10 21:03:04 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-10 21:03:04 +0300
commite31d2fac565ae6f32cb5283eb55392344364b453 (patch)
treeafc154180d958c7df69ce4fd3783364065f5500f /src/kvstorage_server.c
parent4e1470d50f1ece3277dec6c770c5ea74d0db10b7 (diff)
downloadrspamd-e31d2fac565ae6f32cb5283eb55392344364b453.tar.gz
rspamd-e31d2fac565ae6f32cb5283eb55392344364b453.zip
* Add incr and decr commands to kvstorage.
* Add integers detection inside kvstorage.
Diffstat (limited to 'src/kvstorage_server.c')
-rw-r--r--src/kvstorage_server.c196
1 files changed, 166 insertions, 30 deletions
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index 1b542458b..dd0c18b57 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -156,15 +156,52 @@ parse_kvstorage_command (struct kvstorage_session *session, gchar *c, guint len)
}
}
else if (len == 4) {
- if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
+ if ((c[0] == 'i' || c[0] == 'I') &&
+ (c[1] == 'n' || c[1] == 'N') &&
+ (c[2] == 'c' || c[2] == 'C') &&
+ (c[3] == 'r' || c[3] == 'R')) {
+ session->command = KVSTORAGE_CMD_INCR;
+ session->arg_data.value = 1;
+ }
+ else if ((c[0] == 'd' || c[0] == 'D') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'c' || c[2] == 'C') &&
+ (c[3] == 'r' || c[3] == 'R')) {
+ session->command = KVSTORAGE_CMD_DECR;
+ session->arg_data.value = -1;
+ }
+ else if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
session->command = KVSTORAGE_CMD_QUIT;
}
- if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) {
+ else if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) {
session->command = KVSTORAGE_CMD_SYNC;
}
}
else if (len == 6) {
- if (g_ascii_strncasecmp (c, "delete", 6) == 0) {
+ if ((c[0] == 'i' || c[0] == 'I') &&
+ (c[1] == 'n' || c[1] == 'N') &&
+ (c[2] == 'c' || c[2] == 'C') &&
+ (c[3] == 'r' || c[3] == 'R') &&
+ (c[4] == 'b' || c[4] == 'B') &&
+ (c[5] == 'y' || c[5] == 'Y')) {
+ session->command = KVSTORAGE_CMD_INCR;
+ session->arg_data.value = 1;
+ }
+ else if ((c[0] == 'd' || c[0] == 'D') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'c' || c[2] == 'C') &&
+ (c[3] == 'r' || c[3] == 'R') &&
+ (c[4] == 'b' || c[4] == 'B') &&
+ (c[5] == 'y' || c[5] == 'Y')) {
+ session->command = KVSTORAGE_CMD_DECR;
+ session->arg_data.value = -1;
+ }
+ else if ((c[0] == 'd' || c[0] == 'D') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'l' || c[2] == 'L') &&
+ (c[3] == 'e' || c[3] == 'E') &&
+ (c[4] == 't' || c[4] == 'T') &&
+ (c[5] == 'e' || c[5] == 'E')) {
session->command = KVSTORAGE_CMD_DELETE;
}
else if (g_ascii_strncasecmp (c, "select", 6) == 0) {
@@ -271,7 +308,9 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
session->key = memory_pool_alloc (session->pool, p - c + 1);
rspamd_strlcpy (session->key, c, p - c + 1);
/* Now we must select next state based on command */
- if (session->command == KVSTORAGE_CMD_SET) {
+ if (session->command == KVSTORAGE_CMD_SET ||
+ session->command == KVSTORAGE_CMD_INCR ||
+ session->command == KVSTORAGE_CMD_DECR) {
/* Read flags */
state = 99;
if (is_redis) {
@@ -280,7 +319,12 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
session->expire = 0;
}
else {
- next_state = 3;
+ if (session->command == KVSTORAGE_CMD_SET) {
+ next_state = 3;
+ }
+ else {
+ next_state = 5;
+ }
}
}
else {
@@ -299,7 +343,13 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
if (g_ascii_isspace (*p)) {
session->flags = strtoul (c, NULL, 10);
state = 99;
- next_state = 4;
+ if (session->command == KVSTORAGE_CMD_SET) {
+ next_state = 4;
+ }
+ else {
+ /* INCR and DECR */
+ next_state = 5;
+ }
}
else {
return FALSE;
@@ -323,13 +373,29 @@ parse_kvstorage_line (struct kvstorage_session *session, f_str_t *in)
}
break;
case 5:
- /* Read size */
+ /* Read size or incr/decr values */
if (g_ascii_isdigit (*p)) {
p ++;
}
else {
- if (g_ascii_isspace (*p) || end == p) {
- session->length = strtoul (c, NULL, 10);
+ if (g_ascii_isspace (*p) || p >= end - 1) {
+ if (session->command == KVSTORAGE_CMD_SET) {
+ session->arg_data.length = strtoul (c, NULL, 10);
+ }
+ else {
+ if (p != c) {
+ session->arg_data.value = strtoul (c, NULL, 10);
+ if (session->command == KVSTORAGE_CMD_DECR) {
+ session->arg_data.value = -session->arg_data.value;
+ }
+ }
+ else if (session->command == KVSTORAGE_CMD_INCR) {
+ session->arg_data.value = 1;
+ }
+ else {
+ session->arg_data.value = -1;
+ }
+ }
state = 100;
}
else {
@@ -395,13 +461,15 @@ static gboolean
kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
{
gint r;
- gchar outbuf[BUFSIZ];
+ gchar outbuf[BUFSIZ], intbuf[sizeof ("9223372036854775807")];
gboolean res;
struct rspamd_kv_element *elt;
+ guint eltlen;
+ glong longval;
if (session->command == KVSTORAGE_CMD_SET) {
session->state = KVSTORAGE_STATE_READ_DATA;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->length);
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->arg_data.length);
}
else if (session->command == KVSTORAGE_CMD_GET) {
g_static_rw_lock_reader_lock (&session->cf->storage->rwlock);
@@ -419,23 +487,38 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
else {
session->elt = elt;
+ if (elt->flags & KV_ELT_INTEGER) {
+ eltlen = rspamd_snprintf (intbuf, sizeof (intbuf), "%l", ELT_LONG (elt));
+
+ }
+ else {
+ eltlen = elt->size;
+ }
if (!is_redis) {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
- ELT_KEY (elt), elt->flags, elt->size);
+ ELT_KEY (elt), elt->flags, eltlen);
}
else {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF,
- elt->size);
+ eltlen);
}
if (!rspamd_dispatcher_write (session->dispather, outbuf,
r, TRUE, FALSE)) {
g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
return FALSE;
}
- if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
- return FALSE;
+ if (elt->flags & KV_ELT_INTEGER) {
+ if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ return FALSE;
+ }
+ }
+ else {
+ if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ return FALSE;
+ }
}
if (!is_redis) {
res = rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
@@ -482,6 +565,36 @@ kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
}
}
}
+ else if (session->command == KVSTORAGE_CMD_INCR || session->command == KVSTORAGE_CMD_DECR) {
+ g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+ longval = session->arg_data.value;
+ if (!rspamd_kv_storage_increment (session->cf->storage, session->key, &longval)) {
+ g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "-ERR not found" CRLF,
+ sizeof ("-ERR not found" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ else {
+ g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+ if (!is_redis) {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "%l" CRLF,
+ longval);
+ }
+ else {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%l" CRLF,
+ longval);
+ }
+ if (!rspamd_dispatcher_write (session->dispather, outbuf,
+ r, FALSE, FALSE)) {
+ return FALSE;
+ }
+ }
+ }
else if (session->command == KVSTORAGE_CMD_SYNC) {
if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) {
if (!is_redis) {
@@ -586,6 +699,9 @@ kvstorage_check_argnum (struct kvstorage_session *session)
return session->argc == 1;
case KVSTORAGE_CMD_SET:
return session->argc == 3;
+ case KVSTORAGE_CMD_INCR:
+ case KVSTORAGE_CMD_DECR:
+ return session->argc == 1 || session->argc == 2;
default:
return session->argc == 2;
}
@@ -682,9 +798,16 @@ kvstorage_read_socket (f_str_t * in, void *arg)
r, FALSE, TRUE);
}
else {
- session->argnum ++;
- session->state = KVSTORAGE_STATE_READ_ARGLEN;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ if (session->argnum == session->argc - 1) {
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ return kvstorage_process_command (session, TRUE);
+ }
+ else {
+ session->argnum ++;
+ session->state = KVSTORAGE_STATE_READ_ARGLEN;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ }
}
}
}
@@ -693,6 +816,7 @@ kvstorage_read_socket (f_str_t * in, void *arg)
/* This argument is a key for normal command */
session->key = memory_pool_fstrdup (session->pool, in);
if (session->argnum == session->argc - 1) {
+ session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
return kvstorage_process_command (session, TRUE);
}
@@ -704,6 +828,7 @@ kvstorage_read_socket (f_str_t * in, void *arg)
}
else {
/* Special case for select command */
+ session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf), in->len));
session->id = strtoul (outbuf, NULL, 10);
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
@@ -712,19 +837,30 @@ kvstorage_read_socket (f_str_t * in, void *arg)
}
else if (session->argnum == 2) {
/* We get datablock for set command */
- session->state = KVSTORAGE_STATE_READ_CMD;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
- if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len,
- session->flags, session->expire)) {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
- sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ if (session->command == KVSTORAGE_CMD_SET) {
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+ if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len,
+ session->flags, session->expire)) {
+ g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+ return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+ return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
+ sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
+ }
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
- sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ rspamd_strtol (in->begin, in->len, &session->arg_data.value);
+ if (session->command == KVSTORAGE_CMD_DECR) {
+ session->arg_data.value = -session->arg_data.value;
+ }
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ return kvstorage_process_command (session, TRUE);
}
}
break;