aboutsummaryrefslogtreecommitdiffstats
path: root/src/kvstorage_server.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-04 18:48:07 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-11-04 18:48:07 +0300
commit4c653d32a0d9d1a36c4638db15cecb7e526e4449 (patch)
tree694c738f311d6d53fb905ba39b78edb6773c0d47 /src/kvstorage_server.c
parent0825c58337d3fd8f766eaf0a77961bbe3ec09c35 (diff)
downloadrspamd-4c653d32a0d9d1a36c4638db15cecb7e526e4449.tar.gz
rspamd-4c653d32a0d9d1a36c4638db15cecb7e526e4449.zip
* Support redis API emulation
Diffstat (limited to 'src/kvstorage_server.c')
-rw-r--r--src/kvstorage_server.c104
1 files changed, 86 insertions, 18 deletions
diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c
index 0ac84f7a3..c6190454c 100644
--- a/src/kvstorage_server.c
+++ b/src/kvstorage_server.c
@@ -37,6 +37,8 @@
#define ERROR_NOT_FOUND "NOT_FOUND" CRLF
#define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF
+#define ERROR_REDIS_OK "+OK" CRLF
+
static sig_atomic_t wanna_die = 0;
static sig_atomic_t do_reopen_log = 0;
@@ -94,8 +96,10 @@ init_kvstorage_worker (void)
/* Set default values */
ctx->timeout_raw = 300000;
- register_worker_opt (TYPE_SMTP, "timeout", xml_handle_seconds, ctx,
+ register_worker_opt (TYPE_KVSTORAGE, "timeout", xml_handle_seconds, ctx,
G_STRUCT_OFFSET (struct kvstorage_worker_ctx, timeout_raw));
+ register_worker_opt (TYPE_KVSTORAGE, "redis", xml_handle_boolean, ctx,
+ G_STRUCT_OFFSET (struct kvstorage_worker_ctx, is_redis));
return ctx;
}
@@ -131,10 +135,12 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
{
gchar *p, *c, *end;
gint state = 0, next_state;
+ gboolean is_redis;
p = in->begin;
end = in->begin + in->len;
c = p;
+ is_redis = session->thr->ctx->is_redis;
/* State machine for parsing */
while (p <= end) {
@@ -172,12 +178,21 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
/* We got some command, try to parse it */
if (p - c == 3) {
/* Set or get command */
- if (memcmp (c, "get", 3) == 0) {
+ if ((c[0] == 'g' || c[0] == 'G') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 't' || c[2] == 'T')) {
session->command = KVSTORAGE_CMD_GET;
}
- else if (memcmp (c, "set", 3) == 0) {
+ else if ((c[0] == 's' || c[0] == 'S') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 't' || c[2] == 'T')) {
session->command = KVSTORAGE_CMD_SET;
}
+ else if ((c[0] == 'd' || c[0] == 'D') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'l' || c[2] == 'L')) {
+ session->command = KVSTORAGE_CMD_DELETE;
+ }
else {
/* Error */
return FALSE;
@@ -227,7 +242,14 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in)
if (session->command == KVSTORAGE_CMD_SET) {
/* Read flags */
state = 99;
- next_state = 3;
+ if (is_redis) {
+ next_state = 5;
+ session->flags = 0;
+ session->expire = 0;
+ }
+ else {
+ next_state = 3;
+ }
}
else {
/* Nothing to read for other commands */
@@ -317,20 +339,30 @@ kvstorage_read_socket (f_str_t * in, void *arg)
struct rspamd_kv_element *elt;
gint r;
gchar outbuf[BUFSIZ];
+ gboolean is_redis;
if (in->len == 0) {
/* Skip empty commands */
return TRUE;
}
thr = session->thr;
+ is_redis = thr->ctx->is_redis;
+
switch (session->state) {
case KVSTORAGE_STATE_READ_CMD:
/* Update timestamp */
session->now = time (NULL);
if (! parse_kvstorage_command (session, in)) {
thr_info ("%ud: unknown command: %V", thr->id, in);
- return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND,
- sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND,
+ sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE);
+ }
+ else {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in);
+ return rspamd_dispatcher_write (session->dispather, outbuf,
+ r, FALSE, TRUE);
+ }
}
else {
session->cf = get_kvstorage_config (session->id);
@@ -348,12 +380,24 @@ kvstorage_read_socket (f_str_t * in, void *arg)
elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now);
g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
if (elt == NULL) {
- return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
- sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "$-1" CRLF,
+ sizeof ("$-1" CRLF) - 1, FALSE, TRUE);
+ }
}
else {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
- ELT_KEY (elt), elt->flags, elt->size);
+ if (!is_redis) {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
+ ELT_KEY (elt), elt->flags, elt->size);
+ }
+ else {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF,
+ elt->size);
+ }
if (!rspamd_dispatcher_write (session->dispather, outbuf,
r, TRUE, FALSE)) {
return FALSE;
@@ -361,8 +405,14 @@ kvstorage_read_socket (f_str_t * in, void *arg)
if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) {
return FALSE;
}
- return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
- sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
+ sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, CRLF,
+ sizeof (CRLF) - 1, FALSE, TRUE);
+ }
}
}
else if (session->command == KVSTORAGE_CMD_DELETE) {
@@ -374,13 +424,25 @@ kvstorage_read_socket (f_str_t * in, void *arg)
g_slice_free1 (ELT_SIZE (elt), elt);
}
g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
- sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
+ sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, ":1" CRLF,
+ sizeof (":1" CRLF) - 1, FALSE, TRUE);
+ }
}
else {
g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
- sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, ":0" CRLF,
+ sizeof (":0" CRLF) - 1, FALSE, TRUE);
+ }
}
}
else if (session->command == KVSTORAGE_CMD_QUIT) {
@@ -397,8 +459,14 @@ kvstorage_read_socket (f_str_t * in, void *arg)
if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len,
session->flags, session->expire)) {
g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
- sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
+ sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ }
}
else {
g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);