aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-24 17:14:25 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2011-10-24 17:14:25 +0300
commite8c23daa735874d7a4a5f45d46f96c2702f7f7d1 (patch)
treeeb44295fbdca6686732aa576520c7e507129ed9a
parent8bc658b66cee9ba7184d23b195ce2f40e297034c (diff)
downloadrspamd-e8c23daa735874d7a4a5f45d46f96c2702f7f7d1.tar.gz
rspamd-e8c23daa735874d7a4a5f45d46f96c2702f7f7d1.zip
* Implement kvstorage synced API
-rw-r--r--lib/kvstorage/libkvstorageclient.c389
-rw-r--r--lib/kvstorage/libkvstorageclient.h6
-rw-r--r--src/util.h1
3 files changed, 390 insertions, 6 deletions
diff --git a/lib/kvstorage/libkvstorageclient.c b/lib/kvstorage/libkvstorageclient.c
index 6ab1ea4ce..7bbb9732d 100644
--- a/lib/kvstorage/libkvstorageclient.c
+++ b/lib/kvstorage/libkvstorageclient.c
@@ -26,6 +26,14 @@
#include "main.h"
#include "libkvstorageclient.h"
+#define MAX_KV_LINE 1024
+
+struct kvstorage_buf {
+ guint pos;
+ guint len;
+ guint8 data[1];
+};
+
struct rspamd_kvstorage_connection {
gboolean asynced;
gint sock;
@@ -49,6 +57,274 @@ struct rspamd_kvstorage_connection {
gpointer cur_value;
};
+/*
+ * Buffer functions
+ */
+
+/*
+ * Create new kvstorage_buf
+ */
+static struct kvstorage_buf *
+rspamd_kvstorage_buf_create (guint size, memory_pool_t *pool)
+{
+ struct kvstorage_buf *new;
+
+ new = memory_pool_alloc (pool, sizeof (struct kvstorage_buf) + size);
+ new->len = size;
+ new->pos = 0;
+
+ return new;
+}
+
+/*
+ * Read a single line synced or asynced
+ */
+static gint
+rspamd_kvstorage_buf_readline (struct kvstorage_buf *buf, struct rspamd_kvstorage_connection *conn)
+{
+ gint r;
+ guint8 *p;
+
+ r = read (conn->sock, buf->data, buf->len);
+ if (r == -1) {
+ return errno;
+ }
+ /* Try to parse what we have */
+ p = buf->data;
+ while (p - buf->data < r) {
+ if (*p == '\r' || *p == '\n') {
+
+ buf->pos = p - buf->data;
+ return 0;
+ }
+ p ++;
+ }
+
+ if (r == (gint)buf->len) {
+ /* Buffer is overflowed */
+ return EOVERFLOW;
+ }
+ /* Line end not found */
+ return EAGAIN;
+}
+
+/*
+ * Read the whole buffer, return remaining characters or -1
+ */
+static gint
+rspamd_kvstorage_buf_readall (struct kvstorage_buf *buf, struct rspamd_kvstorage_connection *conn)
+{
+ gint r;
+
+ if (buf->len - buf->pos == 0) {
+ return 0;
+ }
+ r = read (conn->sock, buf->data + buf->pos, buf->len - buf->pos);
+ if (r == -1) {
+ return -1;
+ }
+
+ buf->pos += r;
+
+ /* Line end not found */
+ return buf->len - buf->pos;
+}
+
+/*
+ * Write the whole buffer, return remaining characters or -1
+ */
+static gint
+rspamd_kvstorage_buf_writeall (struct kvstorage_buf *buf, struct rspamd_kvstorage_connection *conn)
+{
+ gint r;
+
+ if (buf->len - buf->pos == 0) {
+ return 0;
+ }
+ r = write (conn->sock, buf->data + buf->pos, buf->len - buf->pos);
+ if (r == -1) {
+ return -1;
+ }
+
+ buf->pos += r;
+
+ /* Line end not found */
+ return buf->len - buf->pos;
+}
+
+/*
+ * Drain line from the begin of buffer, moving it from the beginning of buf
+ */
+static void
+rspamd_kvstorage_buf_drainline (struct kvstorage_buf *buf)
+{
+ guint8 *p;
+
+ p = buf->data + buf->pos;
+ /* Skip \r and \n characters */
+ while (p - buf->data < buf->len && (*p == '\r' || *p == '\n')) {
+ p ++;
+ }
+ if (p - buf->data == buf->len) {
+ /* Do not move anything */
+ buf->pos = 0;
+ return;
+ }
+ memcpy (buf->data, p, buf->len - (p - buf->data));
+ buf->pos = buf->len - (p - buf->data);
+}
+
+/* Common utility functions */
+
+/*
+ * Parse reply line that contains an error
+ */
+static enum rspamd_kvstorage_error
+rspamd_kvstorage_parse_reply_error (struct kvstorage_buf *buf)
+{
+ guint8 *p;
+ guint l = 0;
+
+ /* Get one word */
+ p = buf->data;
+ while (p - buf->data < buf->pos) {
+ if (g_ascii_isspace (*p)) {
+ while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+ p ++;
+ }
+ break;
+ }
+ p ++;
+ l ++;
+ }
+
+ /* Get common errors */
+ if (g_ascii_strncasecmp (buf->data, "ERROR", MIN (l, sizeof("ERORR") - 1)) == 0) {
+ return KVSTORAGE_ERROR_INTERNAL_ERROR;
+ }
+ else if (g_ascii_strncasecmp (buf->data, "SERVER_ERROR", MIN (l, sizeof("SERVER_ERORR") - 1)) == 0) {
+ return KVSTORAGE_ERROR_SERVER_ERROR;
+ }
+ else if (g_ascii_strncasecmp (buf->data, "CLIENT_ERROR", MIN (l, sizeof("CLIENT_ERORR") - 1)) == 0) {
+ return KVSTORAGE_ERROR_CLIENT_ERROR;
+ }
+ else if (g_ascii_strncasecmp (buf->data, "NOT_STORED", MIN (l, sizeof("NOT_STORED") - 1)) == 0) {
+ return KVSTORAGE_ERROR_NOT_STORED;
+ }
+ else if (g_ascii_strncasecmp (buf->data, "NOT_FOUND", MIN (l, sizeof("NOT_FOUND") - 1)) == 0) {
+ return KVSTORAGE_ERROR_NOT_FOUND;
+ }
+ else if (g_ascii_strncasecmp (buf->data, "EXISTS", MIN (l, sizeof("EXISTS") - 1)) == 0) {
+ return KVSTORAGE_ERROR_EXISTS;
+ }
+ else if (g_ascii_strncasecmp (buf->data, "STORED", MIN (l, sizeof("STORED") - 1)) == 0) {
+ return KVSTORAGE_ERROR_OK;
+ }
+ else if (g_ascii_strncasecmp (buf->data, "DELETED", MIN (l, sizeof("DELETED") - 1)) == 0) {
+ return KVSTORAGE_ERROR_OK;
+ }
+
+ return KVSTORAGE_ERROR_INTERNAL_ERROR;
+}
+
+/*
+ * Parse reply line, store element length
+ */
+static enum rspamd_kvstorage_error
+rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *flags)
+{
+ guint8 *p, *c;
+ gboolean error = TRUE;
+ gchar *err_str;
+
+ p = buf->data;
+ while (p - buf->data < buf->pos) {
+ if (g_ascii_isspace (*p)) {
+ error = FALSE;
+ while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+ p ++;
+ }
+ break;
+ }
+ p ++;
+ }
+ /* Here we got a word or error flag */
+ if (error) {
+ /* Something wrong here */
+ return KVSTORAGE_ERROR_SERVER_ERROR;
+ }
+ if (g_ascii_strncasecmp (buf->data, "VALUE", sizeof ("VALUE") - 1) != 0) {
+ return rspamd_kvstorage_parse_reply_error (buf);
+ }
+ /* Here we got key, flags and size items */
+ /* Skip key */
+ error = TRUE;
+ while (p - buf->data < buf->pos) {
+ if (g_ascii_isspace (*p)) {
+ error = FALSE;
+ while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+ p ++;
+ }
+ break;
+ }
+ p ++;
+ }
+ if (error) {
+ /* Something wrong here */
+ return KVSTORAGE_ERROR_SERVER_ERROR;
+ }
+ /* Read flags */
+ c = p;
+ error = TRUE;
+ while (p - buf->data < buf->pos) {
+ if (g_ascii_isspace (*p)) {
+ error = FALSE;
+ while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+ p ++;
+ }
+ break;
+ }
+ else if (!g_ascii_isdigit (*p)) {
+ break;
+ }
+ p ++;
+ }
+ if (error) {
+ /* Something wrong here */
+ return KVSTORAGE_ERROR_SERVER_ERROR;
+ }
+ *flags = strtoul (c, &err_str, 10);
+ if (!g_ascii_isspace (*err_str)) {
+ return KVSTORAGE_ERROR_SERVER_ERROR;
+ }
+ /* Read len */
+ c = p;
+ error = TRUE;
+ while (p - buf->data < buf->pos) {
+ if (g_ascii_isspace (*p)) {
+ error = FALSE;
+ while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+ p ++;
+ }
+ break;
+ }
+ else if (!g_ascii_isdigit (*p)) {
+ break;
+ }
+ p ++;
+ }
+ if (error) {
+ /* Something wrong here */
+ return KVSTORAGE_ERROR_SERVER_ERROR;
+ }
+ *len = strtoul (c, &err_str, 10);
+ if (!g_ascii_isspace (*err_str)) {
+ return KVSTORAGE_ERROR_SERVER_ERROR;
+ }
+
+ return KVSTORAGE_ERROR_OK;
+}
+
/* Callbacks for async API */
static void
rspamd_kvstorage_connect_cb (int fd, short what, gpointer ud)
@@ -284,7 +560,7 @@ rspamd_kvstorage_connect_sync (const gchar *host,
/* Set fields */
new->sock = sock;
- new->state = KV_STATE_NONE;
+ new->state = KV_STATE_CONNECTED;
new->asynced = FALSE;
if (tv != NULL) {
memcpy (&new->tv, tv, sizeof (struct timeval));
@@ -306,8 +582,57 @@ rspamd_kvstorage_connect_sync (const gchar *host,
*/
enum rspamd_kvstorage_error
rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
- const gpointer key, gpointer **value)
+ const gpointer key, gpointer **value, guint *len)
{
+ struct kvstorage_buf *buf, *databuf;
+ gint r;
+ guint flags;
+
+ if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
+ return KVSTORAGE_ERROR_INTERNAL_ERROR;
+ }
+
+ buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, conn->pool);
+
+ r = rspamd_snprintf (buf->data, buf->len, "get %s" CRLF, key);
+ buf->len = r;
+ while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) {
+ poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT);
+ }
+
+ if (r == -1) {
+ return KVSTORAGE_ERROR_INTERNAL_ERROR;
+ }
+ /* Now read reply and try to parse line */
+ buf->len = MAX_KV_LINE;
+ buf->pos = 0;
+ while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) {
+ poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
+ }
+ /* A line was read */
+ if (r == 0) {
+ if ((r = rspamd_kvstorage_parse_get_line (buf, len, &flags)) != KVSTORAGE_ERROR_OK) {
+ return r;
+ }
+ rspamd_kvstorage_buf_drainline (buf);
+ /* Now allocate and read the data */
+ databuf = rspamd_kvstorage_buf_create (*len, conn->pool);
+ memcpy (databuf->data, buf->data, buf->pos);
+ while ((r = rspamd_kvstorage_buf_readall (databuf, conn)) > 0) {
+ poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
+ }
+ if (r == -1) {
+ return KVSTORAGE_ERROR_INTERNAL_ERROR;
+ }
+ /* Now we have data inside buffer, read the last line */
+ buf->pos = 0;
+ while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) {
+ poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
+ }
+ *value = (gpointer)buf->data;
+ }
+
+
return KVSTORAGE_ERROR_OK;
}
@@ -321,7 +646,35 @@ enum rspamd_kvstorage_error
rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn,
const gpointer key, const gpointer value, gsize len, guint expire)
{
- return KVSTORAGE_ERROR_OK;
+ struct kvstorage_buf *buf;
+ gint r, keylen, buflen;
+
+ if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
+ return KVSTORAGE_ERROR_INTERNAL_ERROR;
+ }
+
+ /* Create buf */
+ keylen = strlen (key);
+ buflen = len + keylen + sizeof ("set 4294967296 4294967296 4294967296" CRLF);
+ buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
+
+ r = rspamd_snprintf (buf->data, buf->len, "set %*s %ud %ud %ud" CRLF "%*s",
+ keylen, key, 0, expire, len, len, value);
+ buf->len = r;
+ while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) {
+ poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT);
+ }
+ if (r == -1) {
+ return KVSTORAGE_ERROR_INTERNAL_ERROR;
+ }
+ /* Now we can read reply */
+ buf->pos = 0;
+ buf->len = buflen;
+ while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) {
+ poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
+ }
+
+ return rspamd_kvstorage_parse_reply_error (buf);
}
/**
@@ -333,7 +686,35 @@ enum rspamd_kvstorage_error
rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn,
const gpointer key)
{
- return KVSTORAGE_ERROR_OK;
+ struct kvstorage_buf *buf;
+ gint r, keylen, buflen;
+
+ if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
+ return KVSTORAGE_ERROR_INTERNAL_ERROR;
+ }
+
+ /* Create buf */
+ keylen = strlen (key);
+ buflen = MAX (keylen + sizeof ("delete" CRLF), MAX_KV_LINE);
+ buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
+
+ r = rspamd_snprintf (buf->data, buf->len, "delete %*s" CRLF,
+ keylen, key);
+ buf->len = r;
+ while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) {
+ poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT);
+ }
+ if (r == -1) {
+ return KVSTORAGE_ERROR_INTERNAL_ERROR;
+ }
+ /* Now we can read reply */
+ buf->len = buflen;
+ buf->pos = 0;
+ while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) {
+ poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
+ }
+
+ return rspamd_kvstorage_parse_reply_error (buf);
}
/**
diff --git a/lib/kvstorage/libkvstorageclient.h b/lib/kvstorage/libkvstorageclient.h
index 788303adc..262c5e89e 100644
--- a/lib/kvstorage/libkvstorageclient.h
+++ b/lib/kvstorage/libkvstorageclient.h
@@ -31,9 +31,11 @@
enum rspamd_kvstorage_error {
KVSTORAGE_ERROR_OK = 0,
KVSTORAGE_ERROR_TIMEOUT,
- KVSTORAGE_ERROR_NOT_EXIST,
+ KVSTORAGE_ERROR_NOT_FOUND,
KVSTORAGE_ERROR_NOT_STORED,
+ KVSTORAGE_ERROR_EXISTS,
KVSTORAGE_ERROR_SERVER_ERROR,
+ KVSTORAGE_ERROR_CLIENT_ERROR,
KVSTORAGE_ERROR_INTERNAL_ERROR
};
@@ -120,7 +122,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_connect_sync (const gchar *host,
* @param value value readed
*/
enum rspamd_kvstorage_error rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
- const gpointer key, gpointer **value);
+ const gpointer key, gpointer **value, guint *len);
/**
* Write key synced
diff --git a/src/util.h b/src/util.h
index 7bdf3bdc4..0fb6f48bf 100644
--- a/src/util.h
+++ b/src/util.h
@@ -226,6 +226,7 @@ gchar * escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in);
* Convert milliseconds to timeval fields
*/
#define msec_to_tv(msec, tv) do { (tv)->tv_sec = (msec) / 1000; (tv)->tv_usec = ((msec) - (tv)->tv_sec * 1000) * 1000; } while(0)
+#define tv_to_msec(tv) (tv)->tv_sec * 1000 + (tv)->tv_usec / 1000
struct worker_task;
struct rspamd_worker;