KV_STATE_GET,
KV_STATE_WRITE_DATA,
KV_STATE_READ_DATA,
+ KV_STATE_READ_ELT,
KV_STATE_READ_REPLY
} state;
struct event ev;
kvstorage_connect_cb conn_cb;
kvstorage_read_cb read_cb;
kvstorage_write_cb write_cb;
- gpointer cur_ud;
memory_pool_t *pool;
- gpointer cur_key;
- gpointer cur_value;
+};
+
+struct rspamd_kvstorage_async_data {
+ struct rspamd_kvstorage_connection *c;
+ struct kvstorage_buf *buf;
+ gchar *key;
+ guint keylen;
+ gpointer data;
+ guint datalen;
+ guint expire;
+ gpointer ud;
};
/*
static void
rspamd_kvstorage_connect_cb (int fd, short what, gpointer ud)
{
- struct rspamd_kvstorage_connection *c = ud;
+ struct rspamd_kvstorage_async_data *d = ud;
kvstorage_connect_cb cb;
- cb = (kvstorage_connect_cb)c->conn_cb;
+ cb = (kvstorage_connect_cb)d->c->conn_cb;
if (what == EV_TIMEOUT) {
- cb (KVSTORAGE_ERROR_TIMEOUT, c, c->cur_ud);
+ cb (KVSTORAGE_ERROR_TIMEOUT, d->c, d->ud);
}
else {
- cb (KVSTORAGE_ERROR_OK, c, c->cur_ud);
+ cb (KVSTORAGE_ERROR_OK, d->c, d->ud);
}
- c->state = KV_STATE_CONNECTED;
+ d->c->state = KV_STATE_CONNECTED;
}
static void
rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
{
- struct rspamd_kvstorage_connection *c = ud;
- kvstorage_read_cb cb;
+ struct rspamd_kvstorage_async_data *d = ud;
+ kvstorage_read_cb cb;
+ guint buflen, flags;
+ gint r;
+ struct kvstorage_buf *databuf;
- cb = (kvstorage_read_cb)c->read_cb;
+ cb = (kvstorage_read_cb)d->c->read_cb;
+
+ if (what == EV_TIMEOUT) {
+ cb (KVSTORAGE_ERROR_TIMEOUT, d->key, NULL, 0, d->c, d->ud);
+ return;
+ }
+ if (d->c->state == KV_STATE_GET) {
+ /* Create, fill and write buffer */
+ buflen = d->keylen + sizeof ("get " CRLF);
+ d->buf = rspamd_kvstorage_buf_create (buflen, d->c->pool);
+
+ r = rspamd_snprintf (d->buf->data, d->buf->len, "get %*s" CRLF,
+ d->keylen, d->key);
+ d->buf->len = r;
+ r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
+ if (r > 0) {
+ /* Write more data at next call of this function */
+ d->c->state = KV_STATE_WRITE_DATA;
+ /* Event magic */
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_read_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else if (r == 0) {
+ /* We have written everything */
+ d->c->state = KV_STATE_READ_ELT;
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else {
+ /* Error occured during writing */
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ }
+ }
+ else if (d->c->state == KV_STATE_WRITE_DATA) {
+ r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
+ if (r > 0) {
+ /* Write more data at next call of this function */
+ d->c->state = KV_STATE_WRITE_DATA;
+ /* Event magic */
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_read_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else if (r == 0) {
+ /* We have written everything */
+ d->c->state = KV_STATE_READ_ELT;
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else {
+ /* Error occured during writing */
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ }
+ }
+ else if (d->c->state == KV_STATE_READ_ELT) {
+ /* Read element info */
+ r = rspamd_kvstorage_buf_readline (d->buf, d->c);
+ if (r == EAGAIN) {
+ /* Read more data */
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else if (r == 0) {
+ /* Got all data about elt */
+ if ((r = rspamd_kvstorage_parse_get_line (d->buf, &d->datalen, &flags)) != KVSTORAGE_ERROR_OK) {
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ return;
+ }
+ rspamd_kvstorage_buf_drainline (d->buf);
+ /* Now allocate and read the data */
+ databuf = rspamd_kvstorage_buf_create (d->datalen, d->c->pool);
+ memcpy (databuf->data, d->buf->data, d->buf->pos);
+ d->buf = databuf;
+ d->c->state = KV_STATE_READ_DATA;
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else {
+ /* Error occured during reading reply line */
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ }
+ }
+ else if (d->c->state == KV_STATE_READ_DATA) {
+ /* Read data to the buffer */
+ r = rspamd_kvstorage_buf_readall (d->buf, d->c);
+ if (r == 0) {
+ /* All data read, read the last line */
+ d->c->state = KV_STATE_READ_REPLY;
+ /* Save databuf */
+ d->data = d->buf->data;
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else if (r > 0) {
+ /* Read more data into buffer */
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else {
+ /* Error occured */
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ }
+ }
+ else if (d->c->state == KV_STATE_READ_REPLY) {
+ /* Got something from server, try to read line */
+ r = rspamd_kvstorage_buf_readline (d->buf, d->c);
+ if (r == EAGAIN) {
+ /* Read more data */
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else if (r == 0) {
+ cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->data, d->datalen, d->c, d->ud);
+ }
+ else {
+ /* Error occured during reading reply line */
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ }
+ }
}
static void
rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
{
- struct rspamd_kvstorage_connection *c = ud;
+ struct rspamd_kvstorage_async_data *d = ud;
kvstorage_write_cb cb;
+ guint buflen;
+ gint r;
+
+ cb = (kvstorage_write_cb)d->c->write_cb;
- cb = (kvstorage_write_cb)c->write_cb;
+ if (what == EV_TIMEOUT) {
+ cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud);
+ return;
+ }
+ if (d->c->state == KV_STATE_SET) {
+ /* Create, fill and write buffer */
+ buflen = d->datalen + d->keylen + sizeof ("set 4294967296 4294967296 4294967296" CRLF);
+ d->buf = rspamd_kvstorage_buf_create (buflen, d->c->pool);
+
+ r = rspamd_snprintf (d->buf->data, d->buf->len, "set %*s %ud %ud %ud" CRLF "%*s",
+ d->keylen, d->key, 0, d->expire, d->datalen, d->datalen, d->data);
+ d->buf->len = r;
+ r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
+ if (r > 0) {
+ /* Write more data at next call of this function */
+ d->c->state = KV_STATE_WRITE_DATA;
+ /* Event magic */
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_write_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else if (r == 0) {
+ /* We have written everything */
+ d->c->state = KV_STATE_READ_REPLY;
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else {
+ /* Error occured during writing */
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ }
+ }
+ else if (d->c->state == KV_STATE_WRITE_DATA) {
+ r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
+ if (r > 0) {
+ /* Write more data at next call of this function */
+ d->c->state = KV_STATE_WRITE_DATA;
+ /* Event magic */
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_write_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else if (r == 0) {
+ /* We have written everything */
+ d->c->state = KV_STATE_READ_REPLY;
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else {
+ /* Error occured during writing */
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ }
+ }
+ else if (d->c->state == KV_STATE_READ_REPLY) {
+ /* Got something from server, try to read line */
+ r = rspamd_kvstorage_buf_readline (d->buf, d->c);
+ if (r == EAGAIN) {
+ /* Read more data */
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else if (r == 0) {
+ cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud);
+ }
+ else {
+ /* Error occured during reading reply line */
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ }
+ }
}
static void
rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
{
- struct rspamd_kvstorage_connection *c = ud;
+ struct rspamd_kvstorage_async_data *d = ud;
kvstorage_write_cb cb;
+ guint buflen;
+ gint r;
- cb = c->write_cb;
+ cb = (kvstorage_write_cb)d->c->write_cb;
+
+ if (what == EV_TIMEOUT) {
+ cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud);
+ return;
+ }
+ if (d->c->state == KV_STATE_SET) {
+ /* Create, fill and write buffer */
+ buflen = MAX (MAX_KV_LINE, d->keylen + sizeof ("delete " CRLF));
+ d->buf = rspamd_kvstorage_buf_create (buflen, d->c->pool);
+
+ r = rspamd_snprintf (d->buf->data, d->buf->len, "delete %*s" CRLF,
+ d->keylen, d->key);
+ d->buf->len = r;
+ r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
+ if (r > 0) {
+ /* Write more data at next call of this function */
+ d->c->state = KV_STATE_WRITE_DATA;
+ /* Event magic */
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_delete_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else if (r == 0) {
+ /* We have written everything */
+ d->c->state = KV_STATE_READ_REPLY;
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else {
+ /* Error occured during writing */
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ }
+ }
+ else if (d->c->state == KV_STATE_WRITE_DATA) {
+ r = rspamd_kvstorage_buf_writeall (d->buf, d->c);
+ if (r > 0) {
+ /* Write more data at next call of this function */
+ d->c->state = KV_STATE_WRITE_DATA;
+ /* Event magic */
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_delete_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else if (r == 0) {
+ /* We have written everything */
+ d->c->state = KV_STATE_READ_REPLY;
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else {
+ /* Error occured during writing */
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ }
+ }
+ else if (d->c->state == KV_STATE_READ_REPLY) {
+ /* Got something from server, try to read line */
+ r = rspamd_kvstorage_buf_readline (d->buf, d->c);
+ if (r == EAGAIN) {
+ /* Read more data */
+ event_del (&d->c->ev);
+ event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
+ if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
+ event_add (&d->c->ev, &d->c->tv);
+ }
+ else {
+ event_add (&d->c->ev, NULL);
+ }
+ }
+ else if (r == 0) {
+ cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud);
+ }
+ else {
+ /* Error occured during reading reply line */
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ }
+ }
}
/**
struct rspamd_kvstorage_connection **conn)
{
struct rspamd_kvstorage_connection *new;
+ struct rspamd_kvstorage_async_data *data;
gint sock;
/* Here we do NOT try to resolve hostname */
else {
memset (&new->tv, 0, sizeof (struct timeval));
}
+
new->conn_cb = cb;
- new->cur_ud = ud;
new->pool = memory_pool_new (memory_pool_get_size ());
+ data = memory_pool_alloc (new->pool, sizeof (struct rspamd_kvstorage_async_data));
+ data->ud = ud;
+ data->c = new;
/* Set event */
- event_set (&new->ev, new->sock, EV_WRITE, rspamd_kvstorage_connect_cb, new);
+ event_set (&new->ev, new->sock, EV_WRITE, rspamd_kvstorage_connect_cb, data);
if (tv != NULL) {
event_add (&new->ev, &new->tv);
}
rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
const gpointer key, kvstorage_read_cb cb, gpointer ud)
{
+ struct rspamd_kvstorage_async_data *d;
+
if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
return KVSTORAGE_ERROR_INTERNAL_ERROR;
}
else {
conn->read_cb = cb;
- conn->cur_ud = ud;
- conn->cur_key = memory_pool_strdup (conn->pool, key);
+ d = memory_pool_alloc (conn->pool, sizeof (struct rspamd_kvstorage_async_data));
+ d->ud = ud;
+ d->c = conn;
+ d->ud = ud;
+ d->key = memory_pool_strdup (conn->pool, key);
+ d->keylen = strlen (d->key);
conn->state = KV_STATE_GET;
/* Set event */
- event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_read_cb, conn);
+ event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_read_cb, d);
if (conn->tv.tv_sec != 0) {
event_add (&conn->ev, &conn->tv);
}
rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
const gpointer key, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud)
{
+ struct rspamd_kvstorage_async_data *d;
+
if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
return KVSTORAGE_ERROR_INTERNAL_ERROR;
}
else {
conn->write_cb = cb;
- conn->cur_ud = ud;
- conn->cur_key = memory_pool_strdup (conn->pool, key);
- conn->cur_value = value;
+ d = memory_pool_alloc (conn->pool, sizeof (struct rspamd_kvstorage_async_data));
+ d->ud = ud;
+ d->c = conn;
+ d->ud = ud;
+ d->key = memory_pool_strdup (conn->pool, key);
+ d->keylen = strlen (d->key);
+ d->data = value;
+ d->datalen = len;
conn->state = KV_STATE_SET;
/* Set event */
- event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_write_cb, conn);
+ event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_write_cb, d);
if (conn->tv.tv_sec != 0) {
event_add (&conn->ev, &conn->tv);
}
rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn,
const gpointer key, kvstorage_write_cb cb, gpointer ud)
{
+ struct rspamd_kvstorage_async_data *d;
+
if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
return KVSTORAGE_ERROR_INTERNAL_ERROR;
}
else {
conn->write_cb = cb;
- conn->cur_ud = ud;
- conn->cur_key = memory_pool_strdup (conn->pool, key);
+ d = memory_pool_alloc (conn->pool, sizeof (struct rspamd_kvstorage_async_data));
+ d->ud = ud;
+ d->c = conn;
+ d->ud = ud;
+ d->key = memory_pool_strdup (conn->pool, key);
+ d->keylen = strlen (d->key);
conn->state = KV_STATE_SET;
/* Set event */
- event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_delete_cb, conn);
+ event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_delete_cb, d);
if (conn->tv.tv_sec != 0) {
event_add (&conn->ev, &conn->tv);
}
/* Create buf */
keylen = strlen (key);
- buflen = MAX (keylen + sizeof ("delete" CRLF), MAX_KV_LINE);
+ buflen = MAX (keylen + sizeof ("delete " CRLF), MAX_KV_LINE);
buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
r = rspamd_snprintf (buf->data, buf->len, "delete %*s" CRLF,