]> source.dussan.org Git - rspamd.git/commitdiff
* Implement kvstorage synced API
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 24 Oct 2011 14:14:25 +0000 (17:14 +0300)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 24 Oct 2011 14:14:25 +0000 (17:14 +0300)
lib/kvstorage/libkvstorageclient.c
lib/kvstorage/libkvstorageclient.h
src/util.h

index 6ab1ea4ce62f2a44f1274172558a518323891310..7bbb9732db70cfd825bb9e0307d6765eee1d38d7 100644 (file)
 #include "main.h"
 #include "libkvstorageclient.h"
 
+#define MAX_KV_LINE 1024
+
+struct kvstorage_buf {
+       guint pos;
+       guint len;
+       guint8 data[1];
+};
+
 struct rspamd_kvstorage_connection {
        gboolean asynced;
        gint sock;
@@ -49,6 +57,274 @@ struct rspamd_kvstorage_connection {
        gpointer cur_value;
 };
 
+/*
+ * Buffer functions
+ */
+
+/*
+ * Create new kvstorage_buf
+ */
+static struct kvstorage_buf *
+rspamd_kvstorage_buf_create (guint size, memory_pool_t *pool)
+{
+       struct kvstorage_buf                                    *new;
+
+       new = memory_pool_alloc (pool, sizeof (struct kvstorage_buf) + size);
+       new->len = size;
+       new->pos = 0;
+
+       return new;
+}
+
+/*
+ * Read a single line synced or asynced
+ */
+static gint
+rspamd_kvstorage_buf_readline (struct kvstorage_buf *buf, struct rspamd_kvstorage_connection *conn)
+{
+       gint                                                                    r;
+       guint8                                                             *p;
+
+       r = read (conn->sock, buf->data, buf->len);
+       if (r == -1) {
+               return errno;
+       }
+       /* Try to parse what we have */
+       p = buf->data;
+       while (p - buf->data < r) {
+               if (*p == '\r' || *p == '\n') {
+
+                       buf->pos = p - buf->data;
+                       return 0;
+               }
+               p ++;
+       }
+
+       if (r == (gint)buf->len) {
+               /* Buffer is overflowed */
+               return EOVERFLOW;
+       }
+       /* Line end not found */
+       return EAGAIN;
+}
+
+/*
+ * Read the whole buffer, return remaining characters or -1
+ */
+static gint
+rspamd_kvstorage_buf_readall (struct kvstorage_buf *buf, struct rspamd_kvstorage_connection *conn)
+{
+       gint                                                                    r;
+
+       if (buf->len - buf->pos == 0) {
+               return 0;
+       }
+       r = read (conn->sock, buf->data + buf->pos, buf->len - buf->pos);
+       if (r == -1) {
+               return -1;
+       }
+
+       buf->pos += r;
+
+       /* Line end not found */
+       return buf->len - buf->pos;
+}
+
+/*
+ * Write the whole buffer, return remaining characters or -1
+ */
+static gint
+rspamd_kvstorage_buf_writeall (struct kvstorage_buf *buf, struct rspamd_kvstorage_connection *conn)
+{
+       gint                                                                    r;
+
+       if (buf->len - buf->pos == 0) {
+               return 0;
+       }
+       r = write (conn->sock, buf->data + buf->pos, buf->len - buf->pos);
+       if (r == -1) {
+               return -1;
+       }
+
+       buf->pos += r;
+
+       /* Line end not found */
+       return buf->len - buf->pos;
+}
+
+/*
+ * Drain line from the begin of buffer, moving it from the beginning of buf
+ */
+static void
+rspamd_kvstorage_buf_drainline (struct kvstorage_buf *buf)
+{
+       guint8                                                             *p;
+
+       p = buf->data + buf->pos;
+       /* Skip \r and \n characters */
+       while (p - buf->data < buf->len && (*p == '\r' || *p == '\n')) {
+               p ++;
+       }
+       if (p - buf->data == buf->len) {
+               /* Do not move anything */
+               buf->pos = 0;
+               return;
+       }
+       memcpy (buf->data, p, buf->len - (p - buf->data));
+       buf->pos = buf->len - (p - buf->data);
+}
+
+/* Common utility functions */
+
+/*
+ * Parse reply line that contains an error
+ */
+static enum rspamd_kvstorage_error
+rspamd_kvstorage_parse_reply_error (struct kvstorage_buf *buf)
+{
+       guint8                                                             *p;
+       guint                                   l = 0;
+
+       /* Get one word */
+       p = buf->data;
+       while (p - buf->data < buf->pos) {
+               if (g_ascii_isspace (*p)) {
+                       while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+                               p ++;
+                       }
+                       break;
+               }
+               p ++;
+               l ++;
+       }
+
+       /* Get common errors */
+       if (g_ascii_strncasecmp (buf->data, "ERROR", MIN (l, sizeof("ERORR") - 1)) == 0) {
+               return KVSTORAGE_ERROR_INTERNAL_ERROR;
+       }
+       else if (g_ascii_strncasecmp (buf->data, "SERVER_ERROR", MIN (l, sizeof("SERVER_ERORR") - 1)) == 0) {
+               return KVSTORAGE_ERROR_SERVER_ERROR;
+       }
+       else if (g_ascii_strncasecmp (buf->data, "CLIENT_ERROR", MIN (l, sizeof("CLIENT_ERORR") - 1)) == 0) {
+               return KVSTORAGE_ERROR_CLIENT_ERROR;
+       }
+       else if (g_ascii_strncasecmp (buf->data, "NOT_STORED", MIN (l, sizeof("NOT_STORED") - 1)) == 0) {
+               return KVSTORAGE_ERROR_NOT_STORED;
+       }
+       else if (g_ascii_strncasecmp (buf->data, "NOT_FOUND", MIN (l, sizeof("NOT_FOUND") - 1)) == 0) {
+               return KVSTORAGE_ERROR_NOT_FOUND;
+       }
+       else if (g_ascii_strncasecmp (buf->data, "EXISTS", MIN (l, sizeof("EXISTS") - 1)) == 0) {
+               return KVSTORAGE_ERROR_EXISTS;
+       }
+       else if (g_ascii_strncasecmp (buf->data, "STORED", MIN (l, sizeof("STORED") - 1)) == 0) {
+               return KVSTORAGE_ERROR_OK;
+       }
+       else if (g_ascii_strncasecmp (buf->data, "DELETED", MIN (l, sizeof("DELETED") - 1)) == 0) {
+               return KVSTORAGE_ERROR_OK;
+       }
+
+       return KVSTORAGE_ERROR_INTERNAL_ERROR;
+}
+
+/*
+ * Parse reply line, store element length
+ */
+static enum rspamd_kvstorage_error
+rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *flags)
+{
+       guint8                                                             *p, *c;
+       gboolean                                error = TRUE;
+       gchar                                  *err_str;
+
+       p = buf->data;
+       while (p - buf->data < buf->pos) {
+               if (g_ascii_isspace (*p)) {
+                       error = FALSE;
+                       while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+                               p ++;
+                       }
+                       break;
+               }
+               p ++;
+       }
+       /* Here we got a word or error flag */
+       if (error) {
+               /* Something wrong here */
+               return KVSTORAGE_ERROR_SERVER_ERROR;
+       }
+       if (g_ascii_strncasecmp (buf->data, "VALUE", sizeof ("VALUE") - 1) != 0) {
+               return rspamd_kvstorage_parse_reply_error (buf);
+       }
+       /* Here we got key, flags and size items */
+       /* Skip key */
+       error = TRUE;
+       while (p - buf->data < buf->pos) {
+               if (g_ascii_isspace (*p)) {
+                       error = FALSE;
+                       while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+                               p ++;
+                       }
+                       break;
+               }
+               p ++;
+       }
+       if (error) {
+               /* Something wrong here */
+               return KVSTORAGE_ERROR_SERVER_ERROR;
+       }
+       /* Read flags */
+       c = p;
+       error = TRUE;
+       while (p - buf->data < buf->pos) {
+               if (g_ascii_isspace (*p)) {
+                       error = FALSE;
+                       while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+                               p ++;
+                       }
+                       break;
+               }
+               else if (!g_ascii_isdigit (*p)) {
+                       break;
+               }
+               p ++;
+       }
+       if (error) {
+               /* Something wrong here */
+               return KVSTORAGE_ERROR_SERVER_ERROR;
+       }
+       *flags = strtoul (c, &err_str, 10);
+       if (!g_ascii_isspace (*err_str)) {
+               return KVSTORAGE_ERROR_SERVER_ERROR;
+       }
+       /* Read len */
+       c = p;
+       error = TRUE;
+       while (p - buf->data < buf->pos) {
+               if (g_ascii_isspace (*p)) {
+                       error = FALSE;
+                       while (p - buf->data < buf->pos && g_ascii_isspace (*p)) {
+                               p ++;
+                       }
+                       break;
+               }
+               else if (!g_ascii_isdigit (*p)) {
+                       break;
+               }
+               p ++;
+       }
+       if (error) {
+               /* Something wrong here */
+               return KVSTORAGE_ERROR_SERVER_ERROR;
+       }
+       *len = strtoul (c, &err_str, 10);
+       if (!g_ascii_isspace (*err_str)) {
+               return KVSTORAGE_ERROR_SERVER_ERROR;
+       }
+
+       return KVSTORAGE_ERROR_OK;
+}
+
 /* Callbacks for async API */
 static void
 rspamd_kvstorage_connect_cb (int fd, short what, gpointer ud)
@@ -284,7 +560,7 @@ rspamd_kvstorage_connect_sync (const gchar *host,
 
        /* Set fields */
        new->sock = sock;
-       new->state = KV_STATE_NONE;
+       new->state = KV_STATE_CONNECTED;
        new->asynced = FALSE;
        if (tv != NULL) {
                memcpy (&new->tv, tv, sizeof (struct timeval));
@@ -306,8 +582,57 @@ rspamd_kvstorage_connect_sync (const gchar *host,
  */
 enum rspamd_kvstorage_error
 rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
-               const gpointer key, gpointer **value)
+               const gpointer key, gpointer **value, guint *len)
 {
+       struct kvstorage_buf                               *buf, *databuf;
+       gint                                    r;
+       guint                                                                   flags;
+
+       if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
+               return KVSTORAGE_ERROR_INTERNAL_ERROR;
+       }
+
+       buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, conn->pool);
+
+       r = rspamd_snprintf (buf->data, buf->len, "get %s" CRLF, key);
+       buf->len = r;
+       while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) {
+               poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT);
+       }
+
+       if (r == -1) {
+               return KVSTORAGE_ERROR_INTERNAL_ERROR;
+       }
+       /* Now read reply and try to parse line */
+       buf->len = MAX_KV_LINE;
+       buf->pos = 0;
+       while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) {
+               poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
+       }
+       /* A line was read */
+       if (r == 0) {
+               if ((r = rspamd_kvstorage_parse_get_line (buf, len, &flags)) != KVSTORAGE_ERROR_OK) {
+                       return r;
+               }
+               rspamd_kvstorage_buf_drainline (buf);
+               /* Now allocate and read the data */
+               databuf = rspamd_kvstorage_buf_create (*len, conn->pool);
+               memcpy (databuf->data, buf->data, buf->pos);
+               while ((r = rspamd_kvstorage_buf_readall (databuf, conn)) > 0) {
+                       poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
+               }
+               if (r == -1) {
+                       return KVSTORAGE_ERROR_INTERNAL_ERROR;
+               }
+               /* Now we have data inside buffer, read the last line */
+               buf->pos = 0;
+               while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) {
+                       poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
+               }
+               *value = (gpointer)buf->data;
+       }
+
+
        return KVSTORAGE_ERROR_OK;
 }
 
@@ -321,7 +646,35 @@ enum rspamd_kvstorage_error
 rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn,
                const gpointer key, const gpointer value, gsize len, guint expire)
 {
-       return KVSTORAGE_ERROR_OK;
+       struct kvstorage_buf                               *buf;
+       gint                                    r, keylen, buflen;
+
+       if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
+               return KVSTORAGE_ERROR_INTERNAL_ERROR;
+       }
+
+       /* Create buf */
+       keylen = strlen (key);
+       buflen = len + keylen + sizeof ("set  4294967296 4294967296 4294967296" CRLF);
+       buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
+
+       r = rspamd_snprintf (buf->data, buf->len, "set %*s %ud %ud %ud" CRLF "%*s",
+                       keylen, key, 0, expire, len, len, value);
+       buf->len = r;
+       while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) {
+               poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT);
+       }
+       if (r == -1) {
+               return KVSTORAGE_ERROR_INTERNAL_ERROR;
+       }
+       /* Now we can read reply */
+       buf->pos = 0;
+       buf->len = buflen;
+       while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) {
+               poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
+       }
+
+       return rspamd_kvstorage_parse_reply_error (buf);
 }
 
 /**
@@ -333,7 +686,35 @@ enum rspamd_kvstorage_error
 rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn,
                const gpointer key)
 {
-       return KVSTORAGE_ERROR_OK;
+       struct kvstorage_buf                               *buf;
+       gint                                    r, keylen, buflen;
+
+       if (conn == NULL || conn->state != KV_STATE_CONNECTED) {
+               return KVSTORAGE_ERROR_INTERNAL_ERROR;
+       }
+
+       /* Create buf */
+       keylen = strlen (key);
+       buflen = MAX (keylen + sizeof ("delete" CRLF), MAX_KV_LINE);
+       buf = rspamd_kvstorage_buf_create (buflen, conn->pool);
+
+       r = rspamd_snprintf (buf->data, buf->len, "delete %*s" CRLF,
+                       keylen, key);
+       buf->len = r;
+       while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) {
+               poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT);
+       }
+       if (r == -1) {
+               return KVSTORAGE_ERROR_INTERNAL_ERROR;
+       }
+       /* Now we can read reply */
+       buf->len = buflen;
+       buf->pos = 0;
+       while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) {
+               poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN);
+       }
+
+       return rspamd_kvstorage_parse_reply_error (buf);
 }
 
 /**
index 788303adc975c036c09e266589b4d8409bdbe242..262c5e89e14392ea47870bb97e460ec08ce83cf0 100644 (file)
 enum rspamd_kvstorage_error {
        KVSTORAGE_ERROR_OK = 0,
        KVSTORAGE_ERROR_TIMEOUT,
-       KVSTORAGE_ERROR_NOT_EXIST,
+       KVSTORAGE_ERROR_NOT_FOUND,
        KVSTORAGE_ERROR_NOT_STORED,
+       KVSTORAGE_ERROR_EXISTS,
        KVSTORAGE_ERROR_SERVER_ERROR,
+       KVSTORAGE_ERROR_CLIENT_ERROR,
        KVSTORAGE_ERROR_INTERNAL_ERROR
 };
 
@@ -120,7 +122,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_connect_sync (const gchar *host,
  * @param value value readed
  */
 enum rspamd_kvstorage_error rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn,
-               const gpointer key, gpointer **value);
+               const gpointer key, gpointer **value, guint *len);
 
 /**
  * Write key synced
index 7bdf3bdc45c0403eea2aa23d55b74d56d4674b79..0fb6f48bffc52f7730f49fa1f28fcf755a74d106 100644 (file)
@@ -226,6 +226,7 @@ gchar * escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in);
  * Convert milliseconds to timeval fields
  */
 #define msec_to_tv(msec, tv) do { (tv)->tv_sec = (msec) / 1000; (tv)->tv_usec = ((msec) - (tv)->tv_sec * 1000) * 1000; } while(0)
+#define tv_to_msec(tv) (tv)->tv_sec * 1000 + (tv)->tv_usec / 1000
 
 struct worker_task;
 struct rspamd_worker;