diff options
Diffstat (limited to 'lib/kvstorage/libkvstorageclient.c')
-rw-r--r-- | lib/kvstorage/libkvstorageclient.c | 142 |
1 files changed, 91 insertions, 51 deletions
diff --git a/lib/kvstorage/libkvstorageclient.c b/lib/kvstorage/libkvstorageclient.c index 4d3b17dd3..d05e8b5e0 100644 --- a/lib/kvstorage/libkvstorageclient.c +++ b/lib/kvstorage/libkvstorageclient.c @@ -23,11 +23,22 @@ #include "config.h" -#include "main.h" +#include "mem_pool.h" +#include "util.h" #include "libkvstorageclient.h" #define MAX_KV_LINE 1024 +#ifdef CRLF +#undef CRLF +#undef CR +#undef LF +#endif + +#define CRLF "\r\n" +#define CR '\r' +#define LF '\n' + struct kvstorage_buf { guint pos; guint len; @@ -242,12 +253,13 @@ rspamd_kvstorage_parse_reply_error (struct kvstorage_buf *buf) static enum rspamd_kvstorage_error rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *flags) { - guint8 *p, *c; + guint8 *p, *c, *end; gboolean error = TRUE; gchar *err_str; p = buf->data; - while (p - buf->data < buf->pos) { + end = buf->data + buf->pos; + while (p < end) { if (g_ascii_isspace (*p)) { error = FALSE; while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { @@ -268,10 +280,11 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f /* Here we got key, flags and size items */ /* Skip key */ error = TRUE; - while (p - buf->data < buf->pos) { + while (p < end) { if (g_ascii_isspace (*p)) { error = FALSE; - while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { + /* Skip spaces after key */ + while (p < end && g_ascii_isspace (*p)) { p ++; } break; @@ -285,9 +298,10 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f /* Read flags */ c = p; error = TRUE; - while (p - buf->data < buf->pos) { + while (p < end) { if (g_ascii_isspace (*p)) { error = FALSE; + /* Skip spaces after flags */ while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { p ++; } @@ -308,16 +322,8 @@ rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *f } /* Read len */ c = p; - error = TRUE; - while (p - buf->data < buf->pos) { - if (g_ascii_isspace (*p)) { - error = FALSE; - while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { - p ++; - } - break; - } - else if (!g_ascii_isdigit (*p)) { + while (p < end) { + if (!g_ascii_isdigit (*p)) { break; } p ++; @@ -347,9 +353,9 @@ rspamd_kvstorage_connect_cb (int fd, short what, gpointer ud) cb (KVSTORAGE_ERROR_TIMEOUT, d->c, d->ud); } else { + d->c->state = KV_STATE_CONNECTED; cb (KVSTORAGE_ERROR_OK, d->c, d->ud); } - d->c->state = KV_STATE_CONNECTED; } static void @@ -364,7 +370,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) cb = (kvstorage_read_cb)d->c->read_cb; if (what == EV_TIMEOUT) { - cb (KVSTORAGE_ERROR_TIMEOUT, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, NULL, 0, d->c, d->ud); return; } if (d->c->state == KV_STATE_GET) { @@ -391,6 +397,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) } else if (r == 0) { /* We have written everything */ + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); d->c->state = KV_STATE_READ_ELT; event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); @@ -403,7 +410,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) } else { /* Error occured during writing */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud); } } else if (d->c->state == KV_STATE_WRITE_DATA) { @@ -424,6 +431,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) else if (r == 0) { /* We have written everything */ d->c->state = KV_STATE_READ_ELT; + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { @@ -435,7 +443,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) } else { /* Error occured during writing */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud); } } else if (d->c->state == KV_STATE_READ_ELT) { @@ -455,10 +463,11 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) else if (r == 0) { /* Got all data about elt */ if ((r = rspamd_kvstorage_parse_get_line (d->buf, &d->datalen, &flags)) != KVSTORAGE_ERROR_OK) { - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud); return; } rspamd_kvstorage_buf_drainline (d->buf); + /* Now allocate and read the data */ databuf = rspamd_kvstorage_buf_create (d->datalen, d->c->pool); memcpy (databuf->data, d->buf->data, d->buf->pos); @@ -475,7 +484,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) } else { /* Error occured during reading reply line */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud); } } else if (d->c->state == KV_STATE_READ_DATA) { @@ -486,7 +495,6 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) d->c->state = KV_STATE_READ_REPLY; /* Save databuf */ d->data = d->buf->data; - d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_read_cb, d); if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { @@ -509,7 +517,7 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) } else { /* Error occured */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud); } } else if (d->c->state == KV_STATE_READ_REPLY) { @@ -527,11 +535,12 @@ rspamd_kvstorage_read_cb (int fd, short what, gpointer ud) } } else if (r == 0) { - cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->data, d->datalen, d->c, d->ud); + d->c->state = KV_STATE_CONNECTED; + cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->data, d->datalen, d->c, d->ud); } else { /* Error occured during reading reply line */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, NULL, 0, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, NULL, 0, d->c, d->ud); } } } @@ -547,7 +556,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) cb = (kvstorage_write_cb)d->c->write_cb; if (what == EV_TIMEOUT) { - cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, d->c, d->ud); return; } if (d->c->state == KV_STATE_SET) { @@ -574,6 +583,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) } else if (r == 0) { /* We have written everything */ + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); d->c->state = KV_STATE_READ_REPLY; event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d); @@ -586,7 +596,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) } else { /* Error occured during writing */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud); } } else if (d->c->state == KV_STATE_WRITE_DATA) { @@ -607,6 +617,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) else if (r == 0) { /* We have written everything */ d->c->state = KV_STATE_READ_REPLY; + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_write_cb, d); if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { @@ -618,7 +629,7 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) } else { /* Error occured during writing */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud); } } else if (d->c->state == KV_STATE_READ_REPLY) { @@ -636,11 +647,12 @@ rspamd_kvstorage_write_cb (int fd, short what, gpointer ud) } } else if (r == 0) { - cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud); + d->c->state = KV_STATE_CONNECTED; + cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->c, d->ud); } else { /* Error occured during reading reply line */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud); } } } @@ -656,7 +668,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) cb = (kvstorage_write_cb)d->c->write_cb; if (what == EV_TIMEOUT) { - cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_TIMEOUT, d->key, d->keylen, d->c, d->ud); return; } if (d->c->state == KV_STATE_SET) { @@ -683,6 +695,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) } else if (r == 0) { /* We have written everything */ + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); d->c->state = KV_STATE_READ_REPLY; event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d); @@ -695,7 +708,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) } else { /* Error occured during writing */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud); } } else if (d->c->state == KV_STATE_WRITE_DATA) { @@ -716,6 +729,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) else if (r == 0) { /* We have written everything */ d->c->state = KV_STATE_READ_REPLY; + d->buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, d->c->pool); event_del (&d->c->ev); event_set (&d->c->ev, d->c->sock, EV_READ, rspamd_kvstorage_delete_cb, d); if (d->c->tv.tv_sec != 0 || d->c->tv.tv_usec != 0) { @@ -727,7 +741,7 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) } else { /* Error occured during writing */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud); } } else if (d->c->state == KV_STATE_READ_REPLY) { @@ -745,11 +759,12 @@ rspamd_kvstorage_delete_cb (int fd, short what, gpointer ud) } } else if (r == 0) { - cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->c, d->ud); + d->c->state = KV_STATE_CONNECTED; + cb (rspamd_kvstorage_parse_reply_error (d->buf), d->key, d->keylen, d->c, d->ud); } else { /* Error occured during reading reply line */ - cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->c, d->ud); + cb (KVSTORAGE_ERROR_INTERNAL_ERROR, d->key, d->keylen, d->c, d->ud); } } } @@ -773,7 +788,7 @@ rspamd_kvstorage_connect_async (const gchar *host, gint sock; /* Here we do NOT try to resolve hostname */ - if ((sock = make_universal_stream_socket (host, port, TRUE, FALSE, FALSE)) == -1) { + if ((sock = make_universal_stream_socket (host, port, TRUE, FALSE, TRUE)) == -1) { return KVSTORAGE_ERROR_SERVER_ERROR; } @@ -819,7 +834,7 @@ rspamd_kvstorage_connect_async (const gchar *host, */ enum rspamd_kvstorage_error rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn, - const gpointer key, kvstorage_read_cb cb, gpointer ud) + const gpointer key, guint keylen, kvstorage_read_cb cb, gpointer ud) { struct rspamd_kvstorage_async_data *d; @@ -833,7 +848,7 @@ rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn, d->c = conn; d->ud = ud; d->key = memory_pool_strdup (conn->pool, key); - d->keylen = strlen (d->key); + d->keylen = keylen; conn->state = KV_STATE_GET; /* Set event */ @@ -858,7 +873,8 @@ rspamd_kvstorage_get_async (struct rspamd_kvstorage_connection *conn, */ enum rspamd_kvstorage_error rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn, - const gpointer key, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, gpointer ud) + const gpointer key, guint keylen, const gpointer value, gsize len, guint expire, kvstorage_write_cb cb, + gpointer ud) { struct rspamd_kvstorage_async_data *d; @@ -872,7 +888,7 @@ rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn, d->c = conn; d->ud = ud; d->key = memory_pool_strdup (conn->pool, key); - d->keylen = strlen (d->key); + d->keylen = keylen; d->data = value; d->datalen = len; conn->state = KV_STATE_SET; @@ -898,7 +914,7 @@ rspamd_kvstorage_set_async (struct rspamd_kvstorage_connection *conn, */ enum rspamd_kvstorage_error rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn, - const gpointer key, kvstorage_write_cb cb, gpointer ud) + const gpointer key, guint keylen, kvstorage_write_cb cb, gpointer ud) { struct rspamd_kvstorage_async_data *d; @@ -912,7 +928,7 @@ rspamd_kvstorage_delete_async (struct rspamd_kvstorage_connection *conn, d->c = conn; d->ud = ud; d->key = memory_pool_strdup (conn->pool, key); - d->keylen = strlen (d->key); + d->keylen = keylen; conn->state = KV_STATE_SET; /* Set event */ @@ -989,7 +1005,7 @@ rspamd_kvstorage_connect_sync (const gchar *host, */ enum rspamd_kvstorage_error rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn, - const gpointer key, gpointer **value, guint *len) + const gpointer key, guint keylen, gpointer **value, guint *len) { struct kvstorage_buf *buf, *databuf; gint r; @@ -1001,7 +1017,7 @@ rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn, buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, conn->pool); - r = rspamd_snprintf (buf->data, buf->len, "get %s" CRLF, key); + r = rspamd_snprintf (buf->data, buf->len, "get %*s" CRLF, keylen, key); buf->len = r; while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) { poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT); @@ -1051,17 +1067,16 @@ rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn, */ enum rspamd_kvstorage_error rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn, - const gpointer key, const gpointer value, gsize len, guint expire) + const gpointer key, guint keylen, const gpointer value, gsize len, guint expire) { struct kvstorage_buf *buf; - gint r, keylen, buflen; + gint r, buflen; if (conn == NULL || conn->state != KV_STATE_CONNECTED) { return KVSTORAGE_ERROR_INTERNAL_ERROR; } /* Create buf */ - keylen = strlen (key); buflen = len + keylen + sizeof ("set 4294967296 4294967296 4294967296" CRLF); buf = rspamd_kvstorage_buf_create (buflen, conn->pool); @@ -1091,17 +1106,16 @@ rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn, */ enum rspamd_kvstorage_error rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn, - const gpointer key) + const gpointer key, guint keylen) { struct kvstorage_buf *buf; - gint r, keylen, buflen; + gint r, buflen; if (conn == NULL || conn->state != KV_STATE_CONNECTED) { return KVSTORAGE_ERROR_INTERNAL_ERROR; } /* Create buf */ - keylen = strlen (key); buflen = MAX (keylen + sizeof ("delete " CRLF), MAX_KV_LINE); buf = rspamd_kvstorage_buf_create (buflen, conn->pool); @@ -1137,3 +1151,29 @@ rspamd_kvstorage_close_sync (struct rspamd_kvstorage_connection *conn) return KVSTORAGE_ERROR_OK; } + +const gchar* +rspamd_kvstorage_strerror (enum rspamd_kvstorage_error err) +{ + switch (err) { + case KVSTORAGE_ERROR_OK: + return "operation completed"; + case KVSTORAGE_ERROR_TIMEOUT: + return "operation timeout"; + case KVSTORAGE_ERROR_NOT_FOUND: + return "key not found"; + case KVSTORAGE_ERROR_NOT_STORED: + return "key not stored"; + case KVSTORAGE_ERROR_EXISTS: + return "key exists"; + case KVSTORAGE_ERROR_SERVER_ERROR: + return "server error"; + case KVSTORAGE_ERROR_CLIENT_ERROR: + return "client error"; + case KVSTORAGE_ERROR_INTERNAL_ERROR: + return "library error"; + } + + /* Not reached */ + return "unknown error"; +} |