#define ERROR_NOT_FOUND "NOT_FOUND" CRLF
#define ERROR_INVALID_KEYSTORAGE "CLIENT_ERROR storage does not exists" CRLF
+#define ERROR_REDIS_OK "+OK" CRLF
+
static sig_atomic_t wanna_die = 0;
static sig_atomic_t do_reopen_log = 0;
/* Set default values */
ctx->timeout_raw = 300000;
- register_worker_opt (TYPE_SMTP, "timeout", xml_handle_seconds, ctx,
+ register_worker_opt (TYPE_KVSTORAGE, "timeout", xml_handle_seconds, ctx,
G_STRUCT_OFFSET (struct kvstorage_worker_ctx, timeout_raw));
+ register_worker_opt (TYPE_KVSTORAGE, "redis", xml_handle_boolean, ctx,
+ G_STRUCT_OFFSET (struct kvstorage_worker_ctx, is_redis));
return ctx;
}
{
gchar *p, *c, *end;
gint state = 0, next_state;
+ gboolean is_redis;
p = in->begin;
end = in->begin + in->len;
c = p;
+ is_redis = session->thr->ctx->is_redis;
/* State machine for parsing */
while (p <= end) {
/* We got some command, try to parse it */
if (p - c == 3) {
/* Set or get command */
- if (memcmp (c, "get", 3) == 0) {
+ if ((c[0] == 'g' || c[0] == 'G') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 't' || c[2] == 'T')) {
session->command = KVSTORAGE_CMD_GET;
}
- else if (memcmp (c, "set", 3) == 0) {
+ else if ((c[0] == 's' || c[0] == 'S') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 't' || c[2] == 'T')) {
session->command = KVSTORAGE_CMD_SET;
}
+ else if ((c[0] == 'd' || c[0] == 'D') &&
+ (c[1] == 'e' || c[1] == 'E') &&
+ (c[2] == 'l' || c[2] == 'L')) {
+ session->command = KVSTORAGE_CMD_DELETE;
+ }
else {
/* Error */
return FALSE;
if (session->command == KVSTORAGE_CMD_SET) {
/* Read flags */
state = 99;
- next_state = 3;
+ if (is_redis) {
+ next_state = 5;
+ session->flags = 0;
+ session->expire = 0;
+ }
+ else {
+ next_state = 3;
+ }
}
else {
/* Nothing to read for other commands */
struct rspamd_kv_element *elt;
gint r;
gchar outbuf[BUFSIZ];
+ gboolean is_redis;
if (in->len == 0) {
/* Skip empty commands */
return TRUE;
}
thr = session->thr;
+ is_redis = thr->ctx->is_redis;
+
switch (session->state) {
case KVSTORAGE_STATE_READ_CMD:
/* Update timestamp */
session->now = time (NULL);
if (! parse_kvstorage_command (session, in)) {
thr_info ("%ud: unknown command: %V", thr->id, in);
- return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND,
- sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_UNKNOWN_COMMAND,
+ sizeof (ERROR_UNKNOWN_COMMAND) - 1, FALSE, TRUE);
+ }
+ else {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "-ERR unknown command '%V'" CRLF, in);
+ return rspamd_dispatcher_write (session->dispather, outbuf,
+ r, FALSE, TRUE);
+ }
}
else {
session->cf = get_kvstorage_config (session->id);
elt = rspamd_kv_storage_lookup (session->cf->storage, session->key, session->now);
g_static_rw_lock_reader_unlock (&session->cf->storage->rwlock);
if (elt == NULL) {
- return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
- sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "$-1" CRLF,
+ sizeof ("$-1" CRLF) - 1, FALSE, TRUE);
+ }
}
else {
- r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
- ELT_KEY (elt), elt->flags, elt->size);
+ if (!is_redis) {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "VALUE %s %ud %ud" CRLF,
+ ELT_KEY (elt), elt->flags, elt->size);
+ }
+ else {
+ r = rspamd_snprintf (outbuf, sizeof (outbuf), "$%ud" CRLF,
+ elt->size);
+ }
if (!rspamd_dispatcher_write (session->dispather, outbuf,
r, TRUE, FALSE)) {
return FALSE;
if (!rspamd_dispatcher_write (session->dispather, ELT_DATA(elt), elt->size, TRUE, TRUE)) {
return FALSE;
}
- return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
- sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, CRLF "END" CRLF,
+ sizeof (CRLF "END" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, CRLF,
+ sizeof (CRLF) - 1, FALSE, TRUE);
+ }
}
}
else if (session->command == KVSTORAGE_CMD_DELETE) {
g_slice_free1 (ELT_SIZE (elt), elt);
}
g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
- sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, "DELETED" CRLF,
+ sizeof ("DELETED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, ":1" CRLF,
+ sizeof (":1" CRLF) - 1, FALSE, TRUE);
+ }
}
else {
g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
- sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, ERROR_NOT_FOUND,
+ sizeof (ERROR_NOT_FOUND) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, ":0" CRLF,
+ sizeof (":0" CRLF) - 1, FALSE, TRUE);
+ }
}
}
else if (session->command == KVSTORAGE_CMD_QUIT) {
if (rspamd_kv_storage_insert (session->cf->storage, session->key, in->begin, in->len,
session->flags, session->expire)) {
g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);
- return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
- sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
+ if (!is_redis) {
+ return rspamd_dispatcher_write (session->dispather, "STORED" CRLF,
+ sizeof ("STORED" CRLF) - 1, FALSE, TRUE);
+ }
+ else {
+ return rspamd_dispatcher_write (session->dispather, "+OK" CRLF,
+ sizeof ("+OK" CRLF) - 1, FALSE, TRUE);
+ }
}
else {
g_static_rw_lock_writer_unlock (&session->cf->storage->rwlock);