diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-10-24 20:47:39 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-10-24 20:47:39 +0300 |
commit | 88f4ea9a5b1e55169fa23ad2b7ffe7985fb8c3ed (patch) | |
tree | e091c67f33784518626cfdc6e078b517d9bf9546 | |
parent | da3412c27b23e8dbb9330b5c93e4b040d8b6f178 (diff) | |
download | rspamd-88f4ea9a5b1e55169fa23ad2b7ffe7985fb8c3ed.tar.gz rspamd-88f4ea9a5b1e55169fa23ad2b7ffe7985fb8c3ed.zip |
* Implement initial version of asynced kvstorage API
-rw-r--r-- | lib/kvstorage/libkvstorageclient.c | 463 |
1 files changed, 435 insertions, 28 deletions
diff --git a/lib/kvstorage/libkvstorageclient.c b/lib/kvstorage/libkvstorageclient.c index 7bbb9732d..4d3b17dd3 100644 --- a/lib/kvstorage/libkvstorageclient.c +++ b/lib/kvstorage/libkvstorageclient.c @@ -45,16 +45,25 @@ struct rspamd_kvstorage_connection { KV_STATE_GET, KV_STATE_WRITE_DATA, KV_STATE_READ_DATA, + KV_STATE_READ_ELT, KV_STATE_READ_REPLY } state; struct event ev; kvstorage_connect_cb conn_cb; kvstorage_read_cb read_cb; kvstorage_write_cb write_cb; - gpointer cur_ud; memory_pool_t *pool; - gpointer cur_key; - gpointer cur_value; +}; + +struct rspamd_kvstorage_async_data { + struct rspamd_kvstorage_connection *c; + struct kvstorage_buf *buf; + gchar *key; + guint keylen; + gpointer data; + guint datalen; + guint expire; + gpointer ud; }; /* @@ -329,45 +338,420 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f static void rspamd_kvstorage_connect_cb (int fd, short what, gpointer ud) { - struct rspamd_kvstorage_connection *c = ud; + struct rspamd_kvstorage_async_data *d = ud; kvstorage_connect_cb cb; - cb = (kvstorage_connect_cb)c->conn_cb; + cb = (kvstorage_connect_cb)d->c->conn_cb; if (what == EV_TIMEOUT) { - cb (KVSTORAGE_ERROR_TIMEOUT, c, c->cur_ud); + cb (KVSTORAGE_ERROR_TIMEOUT, d->c, d->ud); } else { - cb (KVSTORAGE_ERROR_OK, c, c->cur_ud); + cb (KVSTORAGE_ERROR_OK, d->c, d->ud); } - c->state = KV_STATE_CONNECTED; + d->c->state = KV_STATE_CONNECTED; } static void rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) { - struct rspamd_kvstorage_connection *c = ud; - kvstorage_read_cb cb; + struct rspamd_kvstorage_async_data *d = ud; + kvstorage_read_cb cb; + guint buflen, flags; + gint r; + struct kvstorage_buf *databuf; - cb = (kvstorage_read_cb)c->read_cb; + cb = (kvstorage_read_cb)d->c->read_cb; + + if (what == EV_TIMEOUT) { + cb (KVSTORAGE_ERROR_TIMEOUT, d->key, NULL, 0, d->c, d->ud); + return; + } + if (d->c->state == KV_STATE_GET) { + /* Create, fill and write buffer */ + buflen = d->keylen + sizeof ("get " CRLF); + d->buf = rspamd_kvstorage_buf_create (buflen, d->c->pool); + + r = rspamd_snprintf (d->buf->data, d->buf->len, "get %*s" CRLF, + d->keylen, d->key); + d->buf->len = r; + r = rspamd_kvstorage_buf_writeall (d->buf, d->c); + if (r > 0) { + /* Write more data at next call of this function */ + d->c->state = KV_STATE_WRITE_DATA; + /* Event magic */ + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_read_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else if (r == 0) { + /* We have written everything */ + d->c->state = KV_STATE_READ_ELT; + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else { + /* Error occured during writing */ + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + } + } + else if (d->c->state == KV_STATE_WRITE_DATA) { + r = rspamd_kvstorage_buf_writeall (d->buf, d->c); + if (r > 0) { + /* Write more data at next call of this function */ + d->c->state = KV_STATE_WRITE_DATA; + /* Event magic */ + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_read_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else if (r == 0) { + /* We have written everything */ + d->c->state = KV_STATE_READ_ELT; + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else { + /* Error occured during writing */ + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + } + } + else if (d->c->state == KV_STATE_READ_ELT) { + /* Read element info */ + r = rspamd_kvstorage_buf_readline (d->buf, d->c); + if (r == EAGAIN) { + /* Read more data */ + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else if (r == 0) { + /* Got all data about elt */ + if ((r = rspamd_kvstorage_parse_get_line (d->buf, &d->datalen, &flags)) != KVSTORAGE_ERROR_OK) { + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + return; + } + rspamd_kvstorage_buf_drainline (d->buf); + /* Now allocate and read the data */ + databuf = rspamd_kvstorage_buf_create (d->datalen, d->c->pool); + memcpy (databuf->data, d->buf->data, d->buf->pos); + d->buf = databuf; + d->c->state = KV_STATE_READ_DATA; + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else { + /* Error occured during reading reply line */ + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + } + } + else if (d->c->state == KV_STATE_READ_DATA) { + /* Read data to the buffer */ + r = rspamd_kvstorage_buf_readall (d->buf, d->c); + if (r == 0) { + /* All data read, read the last line */ + d->c->state = KV_STATE_READ_REPLY; + /* Save databuf */ + d->data = d->buf->data; + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else if (r > 0) { + /* Read more data into buffer */ + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else { + /* Error occured */ + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + } + } + else if (d->c->state == KV_STATE_READ_REPLY) { + /* Got something from server, try to read line */ + r = rspamd_kvstorage_buf_readline (d->buf, d->c); + if (r == EAGAIN) { + /* Read more data */ + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else if (r == 0) { + cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->data, d->datalen, d->c, d->ud); + } + else { + /* Error occured during reading reply line */ + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + } + } } static void rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) { - struct rspamd_kvstorage_connection *c = ud; + struct rspamd_kvstorage_async_data *d = ud; kvstorage_write_cb cb; + guint buflen; + gint r; + + cb = (kvstorage_write_cb)d->c->write_cb; - cb = (kvstorage_write_cb)c->write_cb; + if (what == EV_TIMEOUT) { + cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud); + return; + } + if (d->c->state == KV_STATE_SET) { + /* Create, fill and write buffer */ + buflen = d->datalen + d->keylen + sizeof ("set 4294967296 4294967296 4294967296" CRLF); + d->buf = rspamd_kvstorage_buf_create (buflen, d->c->pool); + + r = rspamd_snprintf (d->buf->data, d->buf->len, "set %*s %ud %ud %ud" CRLF "%*s", + d->keylen, d->key, 0, d->expire, d->datalen, d->datalen, d->data); + d->buf->len = r; + r = rspamd_kvstorage_buf_writeall (d->buf, d->c); + if (r > 0) { + /* Write more data at next call of this function */ + d->c->state = KV_STATE_WRITE_DATA; + /* Event magic */ + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_write_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else if (r == 0) { + /* We have written everything */ + d->c->state = KV_STATE_READ_REPLY; + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else { + /* Error occured during writing */ + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + } + } + else if (d->c->state == KV_STATE_WRITE_DATA) { + r = rspamd_kvstorage_buf_writeall (d->buf, d->c); + if (r > 0) { + /* Write more data at next call of this function */ + d->c->state = KV_STATE_WRITE_DATA; + /* Event magic */ + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_write_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else if (r == 0) { + /* We have written everything */ + d->c->state = KV_STATE_READ_REPLY; + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else { + /* Error occured during writing */ + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + } + } + else if (d->c->state == KV_STATE_READ_REPLY) { + /* Got something from server, try to read line */ + r = rspamd_kvstorage_buf_readline (d->buf, d->c); + if (r == EAGAIN) { + /* Read more data */ + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else if (r == 0) { + cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud); + } + else { + /* Error occured during reading reply line */ + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + } + } } static void rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) { - struct rspamd_kvstorage_connection *c = ud; + struct rspamd_kvstorage_async_data *d = ud; kvstorage_write_cb cb; + guint buflen; + gint r; - cb = c->write_cb; + cb = (kvstorage_write_cb)d->c->write_cb; + + if (what == EV_TIMEOUT) { + cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud); + return; + } + if (d->c->state == KV_STATE_SET) { + /* Create, fill and write buffer */ + buflen = MAX (MAX_KV_LINE, d->keylen + sizeof ("delete " CRLF)); + d->buf = rspamd_kvstorage_buf_create (buflen, d->c->pool); + + r = rspamd_snprintf (d->buf->data, d->buf->len, "delete %*s" CRLF, + d->keylen, d->key); + d->buf->len = r; + r = rspamd_kvstorage_buf_writeall (d->buf, d->c); + if (r > 0) { + /* Write more data at next call of this function */ + d->c->state = KV_STATE_WRITE_DATA; + /* Event magic */ + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_delete_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else if (r == 0) { + /* We have written everything */ + d->c->state = KV_STATE_READ_REPLY; + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else { + /* Error occured during writing */ + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + } + } + else if (d->c->state == KV_STATE_WRITE_DATA) { + r = rspamd_kvstorage_buf_writeall (d->buf, d->c); + if (r > 0) { + /* Write more data at next call of this function */ + d->c->state = KV_STATE_WRITE_DATA; + /* Event magic */ + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_WRITE, rspamd_kvstorage_delete_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else if (r == 0) { + /* We have written everything */ + d->c->state = KV_STATE_READ_REPLY; + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else { + /* Error occured during writing */ + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + } + } + else if (d->c->state == KV_STATE_READ_REPLY) { + /* Got something from server, try to read line */ + r = rspamd_kvstorage_buf_readline (d->buf, d->c); + if (r == EAGAIN) { + /* Read more data */ + event_del (&d->c->ev); + event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d); + if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { + event_add (&d->c->ev, &d->c->tv); + } + else { + event_add (&d->c->ev, NULL); + } + } + else if (r == 0) { + cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud); + } + else { + /* Error occured during reading reply line */ + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + } + } } /** @@ -385,6 +769,7 @@ rspamd_kvstorage_connect_async (const gchar *host, struct rspamd_kvstorage_connection **conn) { struct rspamd_kvstorage_connection *new; + struct rspamd_kvstorage_async_data *data; gint sock; /* Here we do NOT try to resolve hostname */ @@ -405,12 +790,15 @@ rspamd_kvstorage_connect_async (const gchar *host, else { memset (&new->tv, 0, sizeof (struct timeval)); } + new->conn_cb = cb; - new->cur_ud = ud; new->pool = memory_pool_new (memory_pool_get_size ()); + data = memory_pool_alloc (new->pool, sizeof (struct rspamd_kvstorage_async_data)); + data->ud = ud; + data->c = new; /* Set event */ - event_set (&new->ev, new->sock, EV_WRITE, rspamd_kvstorage_connect_cb, new); + event_set (&new->ev, new->sock, EV_WRITE, rspamd_kvstorage_connect_cb, data); if (tv != NULL) { event_add (&new->ev, &new->tv); } @@ -433,17 +821,23 @@ enum rspamd_kvstorage_error rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn, const gpointer key, kvstorage_read_cb cb, gpointer ud) { + struct rspamd_kvstorage_async_data *d; + if (conn == NULL || conn->state != KV_STATE_CONNECTED) { return KVSTORAGE_ERROR_INTERNAL_ERROR; } else { conn->read_cb = cb; - conn->cur_ud = ud; - conn->cur_key = memory_pool_strdup (conn->pool, key); + d = memory_pool_alloc (conn->pool, sizeof (struct rspamd_kvstorage_async_data)); + d->ud = ud; + d->c = conn; + d->ud = ud; + d->key = memory_pool_strdup (conn->pool, key); + d->keylen = strlen (d->key); conn->state = KV_STATE_GET; /* Set event */ - event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_read_cb, conn); + event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_read_cb, d); if (conn->tv.tv_sec != 0) { event_add (&conn->ev, &conn->tv); } @@ -466,18 +860,25 @@ enum rspamd_kvstorage_error rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn, const gpointer key, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud) { + struct rspamd_kvstorage_async_data *d; + if (conn == NULL || conn->state != KV_STATE_CONNECTED) { return KVSTORAGE_ERROR_INTERNAL_ERROR; } else { conn->write_cb = cb; - conn->cur_ud = ud; - conn->cur_key = memory_pool_strdup (conn->pool, key); - conn->cur_value = value; + d = memory_pool_alloc (conn->pool, sizeof (struct rspamd_kvstorage_async_data)); + d->ud = ud; + d->c = conn; + d->ud = ud; + d->key = memory_pool_strdup (conn->pool, key); + d->keylen = strlen (d->key); + d->data = value; + d->datalen = len; conn->state = KV_STATE_SET; /* Set event */ - event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_write_cb, conn); + event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_write_cb, d); if (conn->tv.tv_sec != 0) { event_add (&conn->ev, &conn->tv); } @@ -499,17 +900,23 @@ enum rspamd_kvstorage_error rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn, const gpointer key, kvstorage_write_cb cb, gpointer ud) { + struct rspamd_kvstorage_async_data *d; + if (conn == NULL || conn->state != KV_STATE_CONNECTED) { return KVSTORAGE_ERROR_INTERNAL_ERROR; } else { conn->write_cb = cb; - conn->cur_ud = ud; - conn->cur_key = memory_pool_strdup (conn->pool, key); + d = memory_pool_alloc (conn->pool, sizeof (struct rspamd_kvstorage_async_data)); + d->ud = ud; + d->c = conn; + d->ud = ud; + d->key = memory_pool_strdup (conn->pool, key); + d->keylen = strlen (d->key); conn->state = KV_STATE_SET; /* Set event */ - event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_delete_cb, conn); + event_set (&conn->ev, conn->sock, EV_WRITE, rspamd_kvstorage_delete_cb, d); if (conn->tv.tv_sec != 0) { event_add (&conn->ev, &conn->tv); } @@ -695,7 +1102,7 @@ rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn, /* Create buf */ keylen = strlen (key); - buflen = MAX (keylen + sizeof ("delete" CRLF), MAX_KV_LINE); + buflen = MAX (keylen + sizeof ("delete " CRLF), MAX_KV_LINE); buf = rspamd_kvstorage_buf_create (buflen, conn->pool); r = rspamd_snprintf (buf->data, buf->len, "delete %*s" CRLF, |