From: Vsevolod Stakhov Date: Fri, 4 Nov 2011 15:48:07 +0000 (+0300) Subject: * Support redis API emulation X-Git-Tag: 0.4.5~24 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=4c653d32a0d9d1a36c4638db15cecb7e526e4449;p=rspamd.git * Support redis API emulation --- diff --git a/src/kvstorage.c b/src/kvstorage.c index f5b314e29..afab4a83b 100644 --- a/src/kvstorage.c +++ b/src/kvstorage.c @@ -259,7 +259,7 @@ rspamd_kv_storage_lookup (struct rspamd_kv_storage *storage, gpointer key, time_ } } - if (elt && (elt->flags & KV_ELT_PERSISTENT) == 0) { + if (elt && (elt->flags & KV_ELT_PERSISTENT) == 0 && elt->expire > 0) { /* Check expiration */ if (now - elt->age > elt->expire) { rspamd_kv_storage_delete (storage, key); diff --git a/src/kvstorage_server.c b/src/kvstorage_server.c index 0ac84f7a3..c6190454c 100644 --- a/src/kvstorage_server.c +++ b/src/kvstorage_server.c @@ -37,6 +37,8 @@ #define ERROR_NOT_FOUND "NOT_FOUND" CRLF #define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF +#define ERROR_REDIS_OK "+OK" CRLF + static sig_atomic_t wanna_die = 0; static sig_atomic_t do_reopen_log = 0; @@ -94,8 +96,10 @@ init_kvstorage_worker (void) /* Set default values */ ctx->timeout_raw = 300000; - register_worker_opt (TYPE_SMTP, "timeout", xml_handle_seconds, ctx, + register_worker_opt (TYPE_KVSTORAGE, "timeout", xml_handle_seconds, ctx, G_STRUCT_OFFSET (struct kvstorage_worker_ctx, timeout_raw)); + register_worker_opt (TYPE_KVSTORAGE, "redis", xml_handle_boolean, ctx, + G_STRUCT_OFFSET (struct kvstorage_worker_ctx, is_redis)); return ctx; } @@ -131,10 +135,12 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in) { gchar *p, *c, *end; gint state = 0, next_state; + gboolean is_redis; p = in->begin; end = in->begin + in->len; c = p; + is_redis = session->thr->ctx->is_redis; /* State machine for parsing */ while (p <= end) { @@ -172,12 +178,21 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in) /* We got some command, try to parse it */ if (p - c == 3) { /* Set or get command */ - if (memcmp (c, "get", 3) == 0) { + if ((c[0] == 'g' || c[0] == 'G') && + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 't' || c[2] == 'T')) { session->command = KVSTORAGE_CMD_GET; } - else if (memcmp (c, "set", 3) == 0) { + else if ((c[0] == 's' || c[0] == 'S') && + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 't' || c[2] == 'T')) { session->command = KVSTORAGE_CMD_SET; } + else if ((c[0] == 'd' || c[0] == 'D') && + (c[1] == 'e' || c[1] == 'E') && + (c[2] == 'l' || c[2] == 'L')) { + session->command = KVSTORAGE_CMD_DELETE; + } else { /* Error */ return FALSE; @@ -227,7 +242,14 @@ parse_kvstorage_command (struct kvstorage_session *session, f_str_t *in) if (session->command == KVSTORAGE_CMD_SET) { /* Read flags */ state = 99; - next_state = 3; + if (is_redis) { + next_state = 5; + session->flags = 0; + session->expire = 0; + } + else { + next_state = 3; + } } else { /* Nothing to read for other commands */ @@ -317,20 +339,30 @@ kvstorage_read_socket (f_str_t * in, void *arg) struct rspamd_kv_element *elt; gint r; gchar outbuf[BUFSIZ]; + gboolean is_redis; if (in->len == 0) { /* Skip empty commands */ return TRUE; } thr = session->thr; + is_redis = thr->ctx->is_redis; + switch (session->state) { case KVSTORAGE_STATE_READ_CMD: /* Update timestamp */ session->now = time (NULL); if (! parse_kvstorage_command (session, in)) { thr_info ("%ud: unknown command: %V", thr->id, in); - return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND, - sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE); + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND, + sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE); + } + else { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in); + return rspamd_dispatcher_write (session->dispather, outbuf, + r, FALSE, TRUE); + } } else { session->cf = get_kvstorage_config (session->id); @@ -348,12 +380,24 @@ kvstorage_read_socket (f_str_t * in, void *arg) elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now); g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock); if (elt == NULL) { - return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, - sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "$-1" CRLF, + sizeof ("$-1" CRLF) - 1, FALSE, TRUE); + } } else { - r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF, - ELT_KEY (elt), elt->flags, elt->size); + if (!is_redis) { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF, + ELT_KEY (elt), elt->flags, elt->size); + } + else { + r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF, + elt->size); + } if (!rspamd_dispatcher_write (session->dispather, outbuf, r, TRUE, FALSE)) { return FALSE; @@ -361,8 +405,14 @@ kvstorage_read_socket (f_str_t * in, void *arg) if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) { return FALSE; } - return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF, - sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE); + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF, + sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, CRLF, + sizeof (CRLF) - 1, FALSE, TRUE); + } } } else if (session->command == KVSTORAGE_CMD_DELETE) { @@ -374,13 +424,25 @@ kvstorage_read_socket (f_str_t * in, void *arg) g_slice_free1 (ELT_SIZE (elt), elt); } g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); - return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF, - sizeof ("DELETED" CRLF) - 1, FALSE, TRUE); + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF, + sizeof ("DELETED" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, ":1" CRLF, + sizeof (":1" CRLF) - 1, FALSE, TRUE); + } } else { g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); - return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, - sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND, + sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, ":0" CRLF, + sizeof (":0" CRLF) - 1, FALSE, TRUE); + } } } else if (session->command == KVSTORAGE_CMD_QUIT) { @@ -397,8 +459,14 @@ kvstorage_read_socket (f_str_t * in, void *arg) if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len, session->flags, session->expire)) { g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); - return rspamd_dispatcher_write (session->dispather, "STORED" CRLF, - sizeof ("STORED" CRLF) - 1, FALSE, TRUE); + if (!is_redis) { + return rspamd_dispatcher_write (session->dispather, "STORED" CRLF, + sizeof ("STORED" CRLF) - 1, FALSE, TRUE); + } + else { + return rspamd_dispatcher_write (session->dispather, "+OK" CRLF, + sizeof ("+OK" CRLF) - 1, FALSE, TRUE); + } } else { g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock); diff --git a/src/kvstorage_server.h b/src/kvstorage_server.h index 808ead805..b513d33ab 100644 --- a/src/kvstorage_server.h +++ b/src/kvstorage_server.h @@ -35,6 +35,7 @@ struct kvstorage_worker_ctx { guint32 timeout_raw; GList *threads; gint s_pair[2]; + gboolean is_redis; memory_pool_t *pool; struct event_base *ev_base; GStaticMutex log_mtx;