gint steps = 0;
struct rspamd_kv_element *elt;
gboolean res = TRUE;
+ glong longval;
/* Hard limit */
if (storage->max_memory > 0) {
}
/* Insert elt to the cache */
- elt = storage->cache->insert_func (storage->cache, key, data, len);
- if (elt == NULL) {
- return FALSE;
+
+ /* First of all check element for integer */
+ if (rspamd_strtol (data, len, &longval)) {
+ elt = storage->cache->insert_func (storage->cache, key, &longval, sizeof (glong));
+ if (elt == NULL) {
+ return FALSE;
+ }
+ else {
+ elt->flags |= KV_ELT_INTEGER;
+ }
}
- elt->flags = flags;
- elt->size = len;
+ else {
+ elt = storage->cache->insert_func (storage->cache, key, data, len);
+ if (elt == NULL) {
+ return FALSE;
+ }
+ }
+
+ elt->flags |= flags;
elt->expire = expire;
if (expire == 0) {
elt->flags |= KV_ELT_PERSISTENT;
return res;
}
+/** Increment value in kvstorage */
+gboolean
+rspamd_kv_storage_increment (struct rspamd_kv_storage *storage, gpointer key, glong *value)
+{
+ struct rspamd_kv_element *elt = NULL, *belt;
+ glong *lp;
+
+ /* First try to look at cache */
+ elt = storage->cache->lookup_func (storage->cache, key);
+
+ if (elt == NULL && storage->backend) {
+ belt = storage->backend->lookup_func (storage->backend, key);
+ if (belt) {
+ /* Put this element into cache */
+ if ((belt->flags & KV_ELT_INTEGER) != 0) {
+ rspamd_kv_storage_insert_internal (storage, ELT_KEY (belt), ELT_DATA (belt),
+ belt->size, belt->flags,
+ belt->expire, &elt);
+ }
+ if ((belt->flags & KV_ELT_DIRTY) == 0) {
+ g_free (belt);
+ }
+ }
+ }
+ if (elt && (elt->flags & KV_ELT_INTEGER) != 0) {
+ lp = &ELT_LONG (elt);
+ *lp += *value;
+ *value = *lp;
+ if (storage->backend) {
+ return storage->backend->replace_func (storage->backend, key, elt);
+ }
+ else {
+ return TRUE;
+ }
+ }
+
+ return FALSE;
+}
+
/** Lookup an element inside kv storage */
struct rspamd_kv_element*
rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, time_t now)
elt->keylen = keylen;
elt->size = len;
elt->hash = rspamd_strcase_hash (key);
+ elt->flags = 0;
memcpy (elt->data, key, keylen + 1);
memcpy (ELT_DATA (elt), value, len);
g_hash_table_insert (cache->hash, ELT_KEY (elt), elt);
elt->age = time (NULL);
elt->keylen = keylen;
elt->size = len;
+ elt->flags = 0;
elt->hash = rspamd_strcase_hash (key);
memcpy (elt->data, key, keylen + 1);
memcpy (ELT_DATA (elt), value, len);
elt->keylen = keylen;
elt->size = len;
elt->hash = rkey;
+ elt->flags = 0;
memcpy (elt->data, key, keylen + 1);
memcpy (ELT_DATA (elt), value, len);
radix32tree_insert (cache->tree, rkey, 0xffffffff, (uintptr_t)elt);
elt->keylen = keylen;
elt->size = len;
elt->hash = rkey;
+ elt->flags = 0;
memcpy (elt->data, key, keylen + 1);
memcpy (ELT_DATA (elt), value, len);
radix32tree_insert (cache->tree, rkey, 0xffffffff, (uintptr_t)elt);
elt->keylen = keylen;
elt->size = len;
elt->hash = rspamd_strcase_hash (key);
+ elt->flags = 0;
memcpy (elt->data, key, keylen + 1);
memcpy (ELT_DATA (elt), value, len);
JHSI (pelt, cache->judy, ELT_KEY (elt), elt->keylen);
elt->keylen = keylen;
elt->size = len;
elt->hash = rspamd_strcase_hash (key);
+ elt->flags = 0;
memcpy (elt->data, key, keylen + 1);
memcpy (ELT_DATA (elt), value, len);
JHSI (pelt, cache->judy, ELT_KEY (elt), elt->keylen);
}
}
else if (len == 4) {
- if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
+ if ((c[0] == 'i' || c[0] == 'I') &&
+ (c[1] == 'n' || c[1] == 'N') &&
+ (c[2] == 'c' || c[2] == 'C') &&
+ (c[3] == 'r' || c[3] == 'R')) {
+ session->command = KVSTORAGE_CMD_INCR;
+ session->arg_data.value = 1;
+ }
+ else if ((c[0] == 'd' || c[0] == 'D') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'c' || c[2] == 'C') &&
+ (c[3] == 'r' || c[3] == 'R')) {
+ session->command = KVSTORAGE_CMD_DECR;
+ session->arg_data.value = -1;
+ }
+ else if (g_ascii_strncasecmp (c, "quit", 4) == 0) {
session->command = KVSTORAGE_CMD_QUIT;
}
- if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) {
+ else if (g_ascii_strncasecmp (c, "sync", 4) == 0 || g_ascii_strncasecmp (c, "save", 4) == 0) {
session->command = KVSTORAGE_CMD_SYNC;
}
}
else if (len == 6) {
- if (g_ascii_strncasecmp (c, "delete", 6) == 0) {
+ if ((c[0] == 'i' || c[0] == 'I') &&
+ (c[1] == 'n' || c[1] == 'N') &&
+ (c[2] == 'c' || c[2] == 'C') &&
+ (c[3] == 'r' || c[3] == 'R') &&
+ (c[4] == 'b' || c[4] == 'B') &&
+ (c[5] == 'y' || c[5] == 'Y')) {
+ session->command = KVSTORAGE_CMD_INCR;
+ session->arg_data.value = 1;
+ }
+ else if ((c[0] == 'd' || c[0] == 'D') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'c' || c[2] == 'C') &&
+ (c[3] == 'r' || c[3] == 'R') &&
+ (c[4] == 'b' || c[4] == 'B') &&
+ (c[5] == 'y' || c[5] == 'Y')) {
+ session->command = KVSTORAGE_CMD_DECR;
+ session->arg_data.value = -1;
+ }
+ else if ((c[0] == 'd' || c[0] == 'D') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'l' || c[2] == 'L') &&
+ (c[3] == 'e' || c[3] == 'E') &&
+ (c[4] == 't' || c[4] == 'T') &&
+ (c[5] == 'e' || c[5] == 'E')) {
session->command = KVSTORAGE_CMD_DELETE;
}
else if (g_ascii_strncasecmp (c, "select", 6) == 0) {
session->key = memory_pool_alloc (session->pool, p - c + 1);
rspamd_strlcpy (session->key, c, p - c + 1);
/* Now we must select next state based on command */
- if (session->command == KVSTORAGE_CMD_SET) {
+ if (session->command == KVSTORAGE_CMD_SET ||
+ session->command == KVSTORAGE_CMD_INCR ||
+ session->command == KVSTORAGE_CMD_DECR) {
/* Read flags */
state = 99;
if (is_redis) {
session->expire = 0;
}
else {
- next_state = 3;
+ if (session->command == KVSTORAGE_CMD_SET) {
+ next_state = 3;
+ }
+ else {
+ next_state = 5;
+ }
}
}
else {
if (g_ascii_isspace (*p)) {
session->flags = strtoul (c, NULL, 10);
state = 99;
- next_state = 4;
+ if (session->command == KVSTORAGE_CMD_SET) {
+ next_state = 4;
+ }
+ else {
+ /* INCR and DECR */
+ next_state = 5;
+ }
}
else {
return FALSE;
}
break;
case 5:
- /* Read size */
+ /* Read size or incr/decr values */
if (g_ascii_isdigit (*p)) {
p ++;
}
else {
- if (g_ascii_isspace (*p) || end == p) {
- session->length = strtoul (c, NULL, 10);
+ if (g_ascii_isspace (*p) || p >= end - 1) {
+ if (session->command == KVSTORAGE_CMD_SET) {
+ session->arg_data.length = strtoul (c, NULL, 10);
+ }
+ else {
+ if (p != c) {
+ session->arg_data.value = strtoul (c, NULL, 10);
+ if (session->command == KVSTORAGE_CMD_DECR) {
+ session->arg_data.value = -session->arg_data.value;
+ }
+ }
+ else if (session->command == KVSTORAGE_CMD_INCR) {
+ session->arg_data.value = 1;
+ }
+ else {
+ session->arg_data.value = -1;
+ }
+ }
state = 100;
}
else {
kvstorage_process_command (struct kvstorage_session *session, gboolean is_redis)
{
gint r;
- gchar outbuf[BUFSIZ];
+ gchar outbuf[BUFSIZ], intbuf[sizeof ("9223372036854775807")];
gboolean res;
struct rspamd_kv_element *elt;
+ guint eltlen;
+ glong longval;
if (session->command == KVSTORAGE_CMD_SET) {
session->state = KVSTORAGE_STATE_READ_DATA;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->length);
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_CHARACTER, session->arg_data.length);
}
else if (session->command == KVSTORAGE_CMD_GET) {
g_static_rw_lock_reader_lock (&session->cf->storage->rwlock);
}
else {
session->elt = elt;
+ if (elt->flags & KV_ELT_INTEGER) {
+ eltlen = rspamd_snprintf (intbuf, sizeof (intbuf), "%l", ELT_LONG (elt));
+
+ }
+ else {
+ eltlen = elt->size;
+ }
if (!is_redis) {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
- ELT_KEY (elt), elt->flags, elt->size);
+ ELT_KEY (elt), elt->flags, eltlen);
}
else {
r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF,
- elt->size);
+ eltlen);
}
if (!rspamd_dispatcher_write (session->dispather, outbuf,
r, TRUE, FALSE)) {
g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
return FALSE;
}
- if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) {
- g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
- return FALSE;
+ if (elt->flags & KV_ELT_INTEGER) {
+ if (!rspamd_dispatcher_write (session->dispather, intbuf, eltlen, TRUE, TRUE)) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ return FALSE;
+ }
+ }
+ else {
+ if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), eltlen, TRUE, TRUE)) {
+ g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
+ return FALSE;
+ }
}
if (!is_redis) {
res = rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
}
}
}
+ else if (session->command == KVSTORAGE_CMD_INCR || session->command == KVSTORAGE_CMD_DECR) {
+ g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+ longval = session->arg_data.value;
+ if (!rspamd_kv_storage_increment (session->cf->storage, session->key, &longval)) {
+ g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "-ERR not found" CRLF,
+ sizeof ("-ERR not found" CRLF) - 1, FALSE, TRUE);
+ }
+ }
+ else {
+ g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+ if (!is_redis) {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "%l" CRLF,
+ longval);
+ }
+ else {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%l" CRLF,
+ longval);
+ }
+ if (!rspamd_dispatcher_write (session->dispather, outbuf,
+ r, FALSE, FALSE)) {
+ return FALSE;
+ }
+ }
+ }
else if (session->command == KVSTORAGE_CMD_SYNC) {
if (session->cf->storage->backend == NULL || session->cf->storage->backend->sync_func == NULL) {
if (!is_redis) {
return session->argc == 1;
case KVSTORAGE_CMD_SET:
return session->argc == 3;
+ case KVSTORAGE_CMD_INCR:
+ case KVSTORAGE_CMD_DECR:
+ return session->argc == 1 || session->argc == 2;
default:
return session->argc == 2;
}
r, FALSE, TRUE);
}
else {
- session->argnum ++;
- session->state = KVSTORAGE_STATE_READ_ARGLEN;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ if (session->argnum == session->argc - 1) {
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ return kvstorage_process_command (session, TRUE);
+ }
+ else {
+ session->argnum ++;
+ session->state = KVSTORAGE_STATE_READ_ARGLEN;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ }
}
}
}
/* This argument is a key for normal command */
session->key = memory_pool_fstrdup (session->pool, in);
if (session->argnum == session->argc - 1) {
+ session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
return kvstorage_process_command (session, TRUE);
}
}
else {
/* Special case for select command */
+ session->state = KVSTORAGE_STATE_READ_CMD;
rspamd_strlcpy (outbuf, in->begin, MIN (sizeof (outbuf), in->len));
session->id = strtoul (outbuf, NULL, 10);
rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
}
else if (session->argnum == 2) {
/* We get datablock for set command */
- session->state = KVSTORAGE_STATE_READ_CMD;
- rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
- g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
- if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len,
- session->flags, session->expire)) {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
- sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ if (session->command == KVSTORAGE_CMD_SET) {
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ g_static_rw_lock_writer_lock (&session->cf->storage->rwlock);
+ if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len,
+ session->flags, session->expire)) {
+ g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+ return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
+ return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
+ sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
+ }
}
else {
- g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- return rspamd_dispatcher_write (session->dispather, "-ERR not stored" CRLF,
- sizeof ("-ERR not stored" CRLF) - 1, FALSE, TRUE);
+ session->state = KVSTORAGE_STATE_READ_CMD;
+ rspamd_strtol (in->begin, in->len, &session->arg_data.value);
+ if (session->command == KVSTORAGE_CMD_DECR) {
+ session->arg_data.value = -session->arg_data.value;
+ }
+ rspamd_set_dispatcher_policy (session->dispather, BUFFER_LINE, -1);
+ return kvstorage_process_command (session, TRUE);
}
}
break;