aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kvstorage/libkvstorageclient.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kvstorage/libkvstorageclient.c')
-rw-r--r--lib/kvstorage/libkvstorageclient.c142
1 files changed, 91 insertions, 51 deletions
diff --git a/lib/kvstorage/libkvstorageclient.c b/lib/kvstorage/libkvstorageclient.c
index 4d3b17dd3..d05e8b5e0 100644
--- a/lib/kvstorage/libkvstorageclient.c
+++ b/lib/kvstorage/libkvstorageclient.c
@@ -23,11 +23,22 @@
#include "config.h"
-#include "main.h"
+#include "mem_pool.h"
+#include "util.h"
#include "libkvstorageclient.h"
#define MAX_KV_LINE 1024
+#ifdef CRLF
+#undef CRLF
+#undef CR
+#undef LF
+#endif
+
+#define CRLF "\r\n"
+#define CR '\r'
+#define LF '\n'
+
struct kvstorage_buf {
guint pos;
guint len;
@@ -242,12 +253,13 @@ rspamd_kvstorage_parse_reply_error (struct kvstorage_buf *buf)
static enum rspamd_kvstorage_error
rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *flags)
{
- guint8 *p, *c;
+ guint8 *p, *c, *end;
gboolean error = TRUE;
gchar *err_str;
p = buf->data;
- while (p - buf->data < buf->pos) {
+ end = buf->data + buf->pos;
+ while (p < end) {
if (g_ascii_isspace (*p)) {
error = FALSE;
while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
@@ -268,10 +280,11 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f
/* Here we got key, flags and size items */
/* Skip key */
error = TRUE;
- while (p - buf->data < buf->pos) {
+ while (p < end) {
if (g_ascii_isspace (*p)) {
error = FALSE;
- while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+ /* Skip spaces after key */
+ while (p < end && g_ascii_isspace (*p)) {
p ++;
}
break;
@@ -285,9 +298,10 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f
/* Read flags */
c = p;
error = TRUE;
- while (p - buf->data < buf->pos) {
+ while (p < end) {
if (g_ascii_isspace (*p)) {
error = FALSE;
+ /* Skip spaces after flags */
while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
p ++;
}
@@ -308,16 +322,8 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f
}
/* Read len */
c = p;
- error = TRUE;
- while (p - buf->data < buf->pos) {
- if (g_ascii_isspace (*p)) {
- error = FALSE;
- while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
- p ++;
- }
- break;
- }
- else if (!g_ascii_isdigit (*p)) {
+ while (p < end) {
+ if (!g_ascii_isdigit (*p)) {
break;
}
p ++;
@@ -347,9 +353,9 @@ rspamd_kvstorage_connect_cb (int fd, short what, gpointer ud)
cb (KVSTORAGE_ERROR_TIMEOUT, d->c, d->ud);
}
else {
+ d->c->state = KV_STATE_CONNECTED;
cb (KVSTORAGE_ERROR_OK, d->c, d->ud);
}
- d->c->state = KV_STATE_CONNECTED;
}
static void
@@ -364,7 +370,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
cb = (kvstorage_read_cb)d->c->read_cb;
if (what == EV_TIMEOUT) {
- cb (KVSTORAGE_ERROR_TIMEOUT, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, NULL, 0, d->c, d->ud);
return;
}
if (d->c->state == KV_STATE_GET) {
@@ -391,6 +397,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
}
else if (r == 0) {
/* We have written everything */
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
d->c->state = KV_STATE_READ_ELT;
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
@@ -403,7 +410,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during writing */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_WRITE_DATA) {
@@ -424,6 +431,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
else if (r == 0) {
/* We have written everything */
d->c->state = KV_STATE_READ_ELT;
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
@@ -435,7 +443,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during writing */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_READ_ELT) {
@@ -455,10 +463,11 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
else if (r == 0) {
/* Got all data about elt */
if ((r = rspamd_kvstorage_parse_get_line (d->buf, &d->datalen, &flags)) != KVSTORAGE_ERROR_OK) {
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
return;
}
rspamd_kvstorage_buf_drainline (d->buf);
+
/* Now allocate and read the data */
databuf = rspamd_kvstorage_buf_create (d->datalen, d->c->pool);
memcpy (databuf->data, d->buf->data, d->buf->pos);
@@ -475,7 +484,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during reading reply line */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_READ_DATA) {
@@ -486,7 +495,6 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
d->c->state = KV_STATE_READ_REPLY;
/* Save databuf */
d->data = d->buf->data;
- d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d);
if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
@@ -509,7 +517,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_READ_REPLY) {
@@ -527,11 +535,12 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud)
}
}
else if (r == 0) {
- cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->data, d->datalen, d->c, d->ud);
+ d->c->state = KV_STATE_CONNECTED;
+ cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->data, d->datalen, d->c, d->ud);
}
else {
/* Error occured during reading reply line */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud);
}
}
}
@@ -547,7 +556,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
cb = (kvstorage_write_cb)d->c->write_cb;
if (what == EV_TIMEOUT) {
- cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, d->c, d->ud);
return;
}
if (d->c->state == KV_STATE_SET) {
@@ -574,6 +583,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
}
else if (r == 0) {
/* We have written everything */
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
d->c->state = KV_STATE_READ_REPLY;
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
@@ -586,7 +596,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during writing */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_WRITE_DATA) {
@@ -607,6 +617,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
else if (r == 0) {
/* We have written everything */
d->c->state = KV_STATE_READ_REPLY;
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d);
if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
@@ -618,7 +629,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during writing */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_READ_REPLY) {
@@ -636,11 +647,12 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud)
}
}
else if (r == 0) {
- cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud);
+ d->c->state = KV_STATE_CONNECTED;
+ cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->c, d->ud);
}
else {
/* Error occured during reading reply line */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
}
}
}
@@ -656,7 +668,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
cb = (kvstorage_write_cb)d->c->write_cb;
if (what == EV_TIMEOUT) {
- cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, d->c, d->ud);
return;
}
if (d->c->state == KV_STATE_SET) {
@@ -683,6 +695,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
}
else if (r == 0) {
/* We have written everything */
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
d->c->state = KV_STATE_READ_REPLY;
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
@@ -695,7 +708,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during writing */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_WRITE_DATA) {
@@ -716,6 +729,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
else if (r == 0) {
/* We have written everything */
d->c->state = KV_STATE_READ_REPLY;
+ d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool);
event_del (&d->c->ev);
event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d);
if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) {
@@ -727,7 +741,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
}
else {
/* Error occured during writing */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
}
}
else if (d->c->state == KV_STATE_READ_REPLY) {
@@ -745,11 +759,12 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud)
}
}
else if (r == 0) {
- cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud);
+ d->c->state = KV_STATE_CONNECTED;
+ cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->c, d->ud);
}
else {
/* Error occured during reading reply line */
- cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud);
+ cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud);
}
}
}
@@ -773,7 +788,7 @@ rspamd_kvstorage_connect_async (const gchar *host,
gint sock;
/* Here we do NOT try to resolve hostname */
- if ((sock = make_universal_stream_socket (host, port, TRUE, FALSE, FALSE)) == -1) {
+ if ((sock = make_universal_stream_socket (host, port, TRUE, FALSE, TRUE)) == -1) {
return KVSTORAGE_ERROR_SERVER_ERROR;
}
@@ -819,7 +834,7 @@ rspamd_kvstorage_connect_async (const gchar *host,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
- const gpointer key, kvstorage_read_cb cb, gpointer ud)
+ const gpointer key, guint keylen, kvstorage_read_cb cb, gpointer ud)
{
struct rspamd_kvstorage_async_data *d;
@@ -833,7 +848,7 @@ rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
d->c = conn;
d->ud = ud;
d->key = memory_pool_strdup (conn->pool, key);
- d->keylen = strlen (d->key);
+ d->keylen = keylen;
conn->state = KV_STATE_GET;
/* Set event */
@@ -858,7 +873,8 @@ rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
- const gpointer key, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud)
+ const gpointer key, guint keylen, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb,
+ gpointer ud)
{
struct rspamd_kvstorage_async_data *d;
@@ -872,7 +888,7 @@ rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
d->c = conn;
d->ud = ud;
d->key = memory_pool_strdup (conn->pool, key);
- d->keylen = strlen (d->key);
+ d->keylen = keylen;
d->data = value;
d->datalen = len;
conn->state = KV_STATE_SET;
@@ -898,7 +914,7 @@ rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn,
- const gpointer key, kvstorage_write_cb cb, gpointer ud)
+ const gpointer key, guint keylen, kvstorage_write_cb cb, gpointer ud)
{
struct rspamd_kvstorage_async_data *d;
@@ -912,7 +928,7 @@ rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn,
d->c = conn;
d->ud = ud;
d->key = memory_pool_strdup (conn->pool, key);
- d->keylen = strlen (d->key);
+ d->keylen = keylen;
conn->state = KV_STATE_SET;
/* Set event */
@@ -989,7 +1005,7 @@ rspamd_kvstorage_connect_sync (const gchar *host,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
- const gpointer key, gpointer **value, guint *len)
+ const gpointer key, guint keylen, gpointer **value, guint *len)
{
struct kvstorage_buf *buf, *databuf;
gint r;
@@ -1001,7 +1017,7 @@ rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, conn->pool);
- r = rspamd_snprintf (buf->data, buf->len, "get %s" CRLF, key);
+ r = rspamd_snprintf (buf->data, buf->len, "get %*s" CRLF, keylen, key);
buf->len = r;
while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) {
poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT);
@@ -1051,17 +1067,16 @@ rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn,
- const gpointer key, const gpointer value, gsize len, guint expire)
+ const gpointer key, guint keylen, const gpointer value, gsize len, guint expire)
{
struct kvstorage_buf *buf;
- gint r, keylen, buflen;
+ gint r, buflen;
if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
return KVSTORAGE_ERROR_INTERNAL_ERROR;
}
/* Create buf */
- keylen = strlen (key);
buflen = len + keylen + sizeof ("set 4294967296 4294967296 4294967296" CRLF);
buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
@@ -1091,17 +1106,16 @@ rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn,
- const gpointer key)
+ const gpointer key, guint keylen)
{
struct kvstorage_buf *buf;
- gint r, keylen, buflen;
+ gint r, buflen;
if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
return KVSTORAGE_ERROR_INTERNAL_ERROR;
}
/* Create buf */
- keylen = strlen (key);
buflen = MAX (keylen + sizeof ("delete " CRLF), MAX_KV_LINE);
buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
@@ -1137,3 +1151,29 @@ rspamd_kvstorage_close_sync (struct rspamd_kvstorage_connection *conn)
return KVSTORAGE_ERROR_OK;
}
+
+const gchar*
+rspamd_kvstorage_strerror (enum rspamd_kvstorage_error err)
+{
+ switch (err) {
+ case KVSTORAGE_ERROR_OK:
+ return "operation completed";
+ case KVSTORAGE_ERROR_TIMEOUT:
+ return "operation timeout";
+ case KVSTORAGE_ERROR_NOT_FOUND:
+ return "key not found";
+ case KVSTORAGE_ERROR_NOT_STORED:
+ return "key not stored";
+ case KVSTORAGE_ERROR_EXISTS:
+ return "key exists";
+ case KVSTORAGE_ERROR_SERVER_ERROR:
+ return "server error";
+ case KVSTORAGE_ERROR_CLIENT_ERROR:
+ return "client error";
+ case KVSTORAGE_ERROR_INTERNAL_ERROR:
+ return "library error";
+ }
+
+ /* Not reached */
+ return "unknown error";
+}