]> source.dussan.org Git - rspamd.git/commitdiff
* Implement initial version of asynced kvstorage API
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 24 Oct 2011 17:47:39 +0000 (20:47 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 24 Oct 2011 17:47:39 +0000 (20:47 +0300)
lib/kvstorage/libkvstorageclient.c

index 7bbb9732db70cfd825bb9e0307d6765eee1d38d7..4d3b17dd3ea8bf473898e114fb825f5b351d4a35 100644 (file)
@@ -45,16 +45,25 @@ struct rspamd_kvstorage_connection {
                KV_STATE_GET,
                KV_STATE_WRITE_DATA,
                KV_STATE_READ_DATA,
+               KV_STATE_READ_ELT,
                KV_STATE_READ_REPLY
        } state;
        struct event ev;
        kvstorage_connect_cb conn_cb;
        kvstorage_read_cb read_cb;
        kvstorage_write_cb write_cb;
-       gpointer cur_ud;
        memory_pool_t *pool;
-       gpointer cur_key;
-       gpointer cur_value;
+};
+
+struct rspamd_kvstorage_async_data {
+       struct rspamd_kvstorage_connection *c;
+       struct kvstorage_buf *buf;
+       gchar *key;
+       guint keylen;
+       gpointer data;
+       guint datalen;
+       guint expire;
+       gpointer ud;
 };
 
 /*
@@ -329,45 +338,420 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f
 static void
 rspamd_kvstorage_connect_cb (int fd, short what, gpointer ud)
 {
-       struct rspamd_kvstorage_connection              *c = ud;
+       struct rspamd_kvstorage_async_data              *d = ud;
        kvstorage_connect_cb                     cb;
 
-       cb = (kvstorage_connect_cb)c->conn_cb;
+       cb = (kvstorage_connect_cb)d->c->conn_cb;
 
        if (what == EV_TIMEOUT) {
-               cb (KVSTORAGE_ERROR_TIMEOUT, c, c->cur_ud);
+               cb (KVSTORAGE_ERROR_TIMEOUT, d->c, d->ud);
        }
        else {
-               cb (KVSTORAGE_ERROR_OK, c, c->cur_ud);
+               cb (KVSTORAGE_ERROR_OK, d->c, d->ud);
        }
-       c->state = KV_STATE_CONNECTED;
+       d->c->state = KV_STATE_CONNECTED;
 }
 
 static void
 rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
 {
-       struct rspamd_kvstorage_connection              *c = ud;
-       kvstorage_read_cb                                cb;
+       struct rspamd_kvstorage_async_data              *d = ud;
+       kvstorage_read_cb                        cb;
+       guint                                                                    buflen, flags;
+       gint                                                                     r;
+       struct kvstorage_buf                                    *databuf;
 
-       cb = (kvstorage_read_cb)c->read_cb;
+       cb = (kvstorage_read_cb)d->c->read_cb;
+
+       if (what == EV_TIMEOUT) {
+               cb (KVSTORAGE_ERROR_TIMEOUT, d->key, NULL, 0, d->c, d->ud);
+               return;
+       }
+       if (d->c->state == KV_STATE_GET) {
+               /* Create, fill and write buffer */
+               buflen = d->keylen + sizeof ("get " CRLF);
+               d->buf = rspamd_kvstorage_buf_create (buflen, d->c->pool);
+
+               r = rspamd_snprintf (d->buf->data, d->buf->len, "get %*s" CRLF,
+                               d->keylen, d->key);
+               d->buf->len = r;
+               r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
+               if (r > 0) {
+                       /* Write more data at next call of this function */
+                       d->c->state = KV_STATE_WRITE_DATA;
+                       /* Event magic */
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_read_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else if (r == 0) {
+                       /* We have written everything */
+                       d->c->state = KV_STATE_READ_ELT;
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else {
+                       /* Error occured during writing */
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+               }
+       }
+       else if (d->c->state == KV_STATE_WRITE_DATA) {
+               r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
+               if (r > 0) {
+                       /* Write more data at next call of this function */
+                       d->c->state = KV_STATE_WRITE_DATA;
+                       /* Event magic */
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_read_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else if (r == 0) {
+                       /* We have written everything */
+                       d->c->state = KV_STATE_READ_ELT;
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else {
+                       /* Error occured during writing */
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+               }
+       }
+       else if (d->c->state == KV_STATE_READ_ELT) {
+               /* Read element info */
+               r = rspamd_kvstorage_buf_readline (d->buf, d->c);
+               if (r == EAGAIN) {
+                       /* Read more data */
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else if (r == 0) {
+                       /* Got all data about elt */
+                       if ((r = rspamd_kvstorage_parse_get_line (d->buf, &d->datalen, &flags)) != KVSTORAGE_ERROR_OK) {
+                               cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+                               return;
+                       }
+                       rspamd_kvstorage_buf_drainline (d->buf);
+                       /* Now allocate and read the data */
+                       databuf = rspamd_kvstorage_buf_create (d->datalen, d->c->pool);
+                       memcpy (databuf->data, d->buf->data, d->buf->pos);
+                       d->buf = databuf;
+                       d->c->state = KV_STATE_READ_DATA;
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else {
+                       /* Error occured during reading reply line */
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+               }
+       }
+       else if (d->c->state == KV_STATE_READ_DATA) {
+               /* Read data to the buffer */
+               r = rspamd_kvstorage_buf_readall (d->buf, d->c);
+               if (r == 0) {
+                       /* All data read, read the last line */
+                       d->c->state = KV_STATE_READ_REPLY;
+                       /* Save databuf */
+                       d->data = d->buf->data;
+                       d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else if (r > 0) {
+                       /* Read more data into buffer */
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else {
+                       /* Error occured */
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+               }
+       }
+       else if (d->c->state == KV_STATE_READ_REPLY) {
+               /* Got something from server, try to read line */
+               r = rspamd_kvstorage_buf_readline (d->buf, d->c);
+               if (r == EAGAIN) {
+                       /* Read more data */
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else if (r == 0) {
+                       cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->data, d->datalen, d->c, d->ud);
+               }
+               else {
+                       /* Error occured during reading reply line */
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+               }
+       }
 }
 
 static void
 rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
 {
-       struct rspamd_kvstorage_connection              *c = ud;
+       struct rspamd_kvstorage_async_data              *d = ud;
        kvstorage_write_cb                       cb;
+       guint                                                                    buflen;
+       gint                                                                     r;
+
+       cb = (kvstorage_write_cb)d->c->write_cb;
 
-       cb = (kvstorage_write_cb)c->write_cb;
+       if (what == EV_TIMEOUT) {
+               cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud);
+               return;
+       }
+       if (d->c->state == KV_STATE_SET) {
+               /* Create, fill and write buffer */
+               buflen = d->datalen + d->keylen + sizeof ("set  4294967296 4294967296 4294967296" CRLF);
+               d->buf = rspamd_kvstorage_buf_create (buflen, d->c->pool);
+
+               r = rspamd_snprintf (d->buf->data, d->buf->len, "set %*s %ud %ud %ud" CRLF "%*s",
+                               d->keylen, d->key, 0, d->expire, d->datalen, d->datalen, d->data);
+               d->buf->len = r;
+               r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
+               if (r > 0) {
+                       /* Write more data at next call of this function */
+                       d->c->state = KV_STATE_WRITE_DATA;
+                       /* Event magic */
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_write_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else if (r == 0) {
+                       /* We have written everything */
+                       d->c->state = KV_STATE_READ_REPLY;
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else {
+                       /* Error occured during writing */
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+               }
+       }
+       else if (d->c->state == KV_STATE_WRITE_DATA) {
+               r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
+               if (r > 0) {
+                       /* Write more data at next call of this function */
+                       d->c->state = KV_STATE_WRITE_DATA;
+                       /* Event magic */
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_write_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else if (r == 0) {
+                       /* We have written everything */
+                       d->c->state = KV_STATE_READ_REPLY;
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else {
+                       /* Error occured during writing */
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+               }
+       }
+       else if (d->c->state == KV_STATE_READ_REPLY) {
+               /* Got something from server, try to read line */
+               r = rspamd_kvstorage_buf_readline (d->buf, d->c);
+               if (r == EAGAIN) {
+                       /* Read more data */
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else if (r == 0) {
+                       cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud);
+               }
+               else {
+                       /* Error occured during reading reply line */
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+               }
+       }
 }
 
 static void
 rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
 {
-       struct rspamd_kvstorage_connection              *c = ud;
+       struct rspamd_kvstorage_async_data              *d = ud;
        kvstorage_write_cb                       cb;
+       guint                                                                    buflen;
+       gint                                                                     r;
 
-       cb = c->write_cb;
+       cb = (kvstorage_write_cb)d->c->write_cb;
+
+       if (what == EV_TIMEOUT) {
+               cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud);
+               return;
+       }
+       if (d->c->state == KV_STATE_SET) {
+               /* Create, fill and write buffer */
+               buflen = MAX (MAX_KV_LINE, d->keylen + sizeof ("delete " CRLF));
+               d->buf = rspamd_kvstorage_buf_create (buflen, d->c->pool);
+
+               r = rspamd_snprintf (d->buf->data, d->buf->len, "delete %*s" CRLF,
+                               d->keylen, d->key);
+               d->buf->len = r;
+               r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
+               if (r > 0) {
+                       /* Write more data at next call of this function */
+                       d->c->state = KV_STATE_WRITE_DATA;
+                       /* Event magic */
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_delete_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else if (r == 0) {
+                       /* We have written everything */
+                       d->c->state = KV_STATE_READ_REPLY;
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else {
+                       /* Error occured during writing */
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+               }
+       }
+       else if (d->c->state == KV_STATE_WRITE_DATA) {
+               r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
+               if (r > 0) {
+                       /* Write more data at next call of this function */
+                       d->c->state = KV_STATE_WRITE_DATA;
+                       /* Event magic */
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_delete_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else if (r == 0) {
+                       /* We have written everything */
+                       d->c->state = KV_STATE_READ_REPLY;
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else {
+                       /* Error occured during writing */
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+               }
+       }
+       else if (d->c->state == KV_STATE_READ_REPLY) {
+               /* Got something from server, try to read line */
+               r = rspamd_kvstorage_buf_readline (d->buf, d->c);
+               if (r == EAGAIN) {
+                       /* Read more data */
+                       event_del (&d->c->ev);
+                       event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
+                       if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+                               event_add (&d->c->ev, &d->c->tv);
+                       }
+                       else {
+                               event_add (&d->c->ev, NULL);
+                       }
+               }
+               else if (r == 0) {
+                       cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud);
+               }
+               else {
+                       /* Error occured during reading reply line */
+                       cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+               }
+       }
 }
 
 /**
@@ -385,6 +769,7 @@ rspamd_kvstorage_connect_async (const gchar *host,
                struct rspamd_kvstorage_connection **conn)
 {
        struct rspamd_kvstorage_connection              *new;
+       struct rspamd_kvstorage_async_data      *data;
        gint                                                                     sock;
 
        /* Here we do NOT try to resolve hostname */
@@ -405,12 +790,15 @@ rspamd_kvstorage_connect_async (const gchar *host,
        else {
                memset (&new->tv, 0, sizeof (struct timeval));
        }
+
        new->conn_cb = cb;
-       new->cur_ud = ud;
        new->pool = memory_pool_new (memory_pool_get_size ());
+       data = memory_pool_alloc (new->pool, sizeof (struct rspamd_kvstorage_async_data));
+       data->ud = ud;
+       data->c = new;
 
        /* Set event */
-       event_set (&new->ev, new->sock, EV_WRITE, rspamd_kvstorage_connect_cb, new);
+       event_set (&new->ev, new->sock, EV_WRITE, rspamd_kvstorage_connect_cb, data);
        if (tv != NULL) {
                event_add (&new->ev, &new->tv);
        }
@@ -433,17 +821,23 @@ enum rspamd_kvstorage_error
 rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
                const gpointer key, kvstorage_read_cb cb, gpointer ud)
 {
+       struct rspamd_kvstorage_async_data              *d;
+
        if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
                return KVSTORAGE_ERROR_INTERNAL_ERROR;
        }
        else {
                conn->read_cb = cb;
-               conn->cur_ud = ud;
-               conn->cur_key = memory_pool_strdup (conn->pool, key);
+               d = memory_pool_alloc (conn->pool, sizeof (struct rspamd_kvstorage_async_data));
+               d->ud = ud;
+               d->c = conn;
+               d->ud = ud;
+               d->key = memory_pool_strdup (conn->pool, key);
+               d->keylen = strlen (d->key);
                conn->state = KV_STATE_GET;
 
                /* Set event */
-               event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_read_cb, conn);
+               event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_read_cb, d);
                if (conn->tv.tv_sec != 0) {
                        event_add (&conn->ev, &conn->tv);
                }
@@ -466,18 +860,25 @@ enum rspamd_kvstorage_error
 rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
                const gpointer key, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud)
 {
+       struct rspamd_kvstorage_async_data              *d;
+
        if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
                return KVSTORAGE_ERROR_INTERNAL_ERROR;
        }
        else {
                conn->write_cb = cb;
-               conn->cur_ud = ud;
-               conn->cur_key = memory_pool_strdup (conn->pool, key);
-               conn->cur_value = value;
+               d = memory_pool_alloc (conn->pool, sizeof (struct rspamd_kvstorage_async_data));
+               d->ud = ud;
+               d->c = conn;
+               d->ud = ud;
+               d->key = memory_pool_strdup (conn->pool, key);
+               d->keylen = strlen (d->key);
+               d->data = value;
+               d->datalen = len;
                conn->state = KV_STATE_SET;
 
                /* Set event */
-               event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_write_cb, conn);
+               event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_write_cb, d);
                if (conn->tv.tv_sec != 0) {
                        event_add (&conn->ev, &conn->tv);
                }
@@ -499,17 +900,23 @@ enum rspamd_kvstorage_error
 rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn,
                const gpointer key, kvstorage_write_cb cb, gpointer ud)
 {
+       struct rspamd_kvstorage_async_data              *d;
+
        if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
                return KVSTORAGE_ERROR_INTERNAL_ERROR;
        }
        else {
                conn->write_cb = cb;
-               conn->cur_ud = ud;
-               conn->cur_key = memory_pool_strdup (conn->pool, key);
+               d = memory_pool_alloc (conn->pool, sizeof (struct rspamd_kvstorage_async_data));
+               d->ud = ud;
+               d->c = conn;
+               d->ud = ud;
+               d->key = memory_pool_strdup (conn->pool, key);
+               d->keylen = strlen (d->key);
                conn->state = KV_STATE_SET;
 
                /* Set event */
-               event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_delete_cb, conn);
+               event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_delete_cb, d);
                if (conn->tv.tv_sec != 0) {
                        event_add (&conn->ev, &conn->tv);
                }
@@ -695,7 +1102,7 @@ rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn,
 
        /* Create buf */
        keylen = strlen (key);
-       buflen = MAX (keylen + sizeof ("delete" CRLF), MAX_KV_LINE);
+       buflen = MAX (keylen + sizeof ("delete " CRLF), MAX_KV_LINE);
        buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
 
        r = rspamd_snprintf (buf->data, buf->len, "delete %*s" CRLF,