diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-10-24 17:14:25 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2011-10-24 17:14:25 +0300 |
commit | e8c23daa735874d7a4a5f45d46f96c2702f7f7d1 (patch) | |
tree | eb44295fbdca6686732aa576520c7e507129ed9a | |
parent | 8bc658b66cee9ba7184d23b195ce2f40e297034c (diff) | |
download | rspamd-e8c23daa735874d7a4a5f45d46f96c2702f7f7d1.tar.gz rspamd-e8c23daa735874d7a4a5f45d46f96c2702f7f7d1.zip |
* Implement kvstorage synced API
-rw-r--r-- | lib/kvstorage/libkvstorageclient.c | 389 | ||||
-rw-r--r-- | lib/kvstorage/libkvstorageclient.h | 6 | ||||
-rw-r--r-- | src/util.h | 1 |
3 files changed, 390 insertions, 6 deletions
diff --git a/lib/kvstorage/libkvstorageclient.c b/lib/kvstorage/libkvstorageclient.c index 6ab1ea4ce..7bbb9732d 100644 --- a/lib/kvstorage/libkvstorageclient.c +++ b/lib/kvstorage/libkvstorageclient.c @@ -26,6 +26,14 @@ #include "main.h" #include "libkvstorageclient.h" +#define MAX_KV_LINE 1024 + +struct kvstorage_buf { + guint pos; + guint len; + guint8 data[1]; +}; + struct rspamd_kvstorage_connection { gboolean asynced; gint sock; @@ -49,6 +57,274 @@ struct rspamd_kvstorage_connection { gpointer cur_value; }; +/* + * Buffer functions + */ + +/* + * Create new kvstorage_buf + */ +static struct kvstorage_buf * +rspamd_kvstorage_buf_create (guint size, memory_pool_t *pool) +{ + struct kvstorage_buf *new; + + new = memory_pool_alloc (pool, sizeof (struct kvstorage_buf) + size); + new->len = size; + new->pos = 0; + + return new; +} + +/* + * Read a single line synced or asynced + */ +static gint +rspamd_kvstorage_buf_readline (struct kvstorage_buf *buf, struct rspamd_kvstorage_connection *conn) +{ + gint r; + guint8 *p; + + r = read (conn->sock, buf->data, buf->len); + if (r == -1) { + return errno; + } + /* Try to parse what we have */ + p = buf->data; + while (p - buf->data < r) { + if (*p == '\r' || *p == '\n') { + + buf->pos = p - buf->data; + return 0; + } + p ++; + } + + if (r == (gint)buf->len) { + /* Buffer is overflowed */ + return EOVERFLOW; + } + /* Line end not found */ + return EAGAIN; +} + +/* + * Read the whole buffer, return remaining characters or -1 + */ +static gint +rspamd_kvstorage_buf_readall (struct kvstorage_buf *buf, struct rspamd_kvstorage_connection *conn) +{ + gint r; + + if (buf->len - buf->pos == 0) { + return 0; + } + r = read (conn->sock, buf->data + buf->pos, buf->len - buf->pos); + if (r == -1) { + return -1; + } + + buf->pos += r; + + /* Line end not found */ + return buf->len - buf->pos; +} + +/* + * Write the whole buffer, return remaining characters or -1 + */ +static gint +rspamd_kvstorage_buf_writeall (struct kvstorage_buf *buf, struct rspamd_kvstorage_connection *conn) +{ + gint r; + + if (buf->len - buf->pos == 0) { + return 0; + } + r = write (conn->sock, buf->data + buf->pos, buf->len - buf->pos); + if (r == -1) { + return -1; + } + + buf->pos += r; + + /* Line end not found */ + return buf->len - buf->pos; +} + +/* + * Drain line from the begin of buffer, moving it from the beginning of buf + */ +static void +rspamd_kvstorage_buf_drainline (struct kvstorage_buf *buf) +{ + guint8 *p; + + p = buf->data + buf->pos; + /* Skip \r and \n characters */ + while (p - buf->data < buf->len && (*p == '\r' || *p == '\n')) { + p ++; + } + if (p - buf->data == buf->len) { + /* Do not move anything */ + buf->pos = 0; + return; + } + memcpy (buf->data, p, buf->len - (p - buf->data)); + buf->pos = buf->len - (p - buf->data); +} + +/* Common utility functions */ + +/* + * Parse reply line that contains an error + */ +static enum rspamd_kvstorage_error +rspamd_kvstorage_parse_reply_error (struct kvstorage_buf *buf) +{ + guint8 *p; + guint l = 0; + + /* Get one word */ + p = buf->data; + while (p - buf->data < buf->pos) { + if (g_ascii_isspace (*p)) { + while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { + p ++; + } + break; + } + p ++; + l ++; + } + + /* Get common errors */ + if (g_ascii_strncasecmp (buf->data, "ERROR", MIN (l, sizeof("ERORR") - 1)) == 0) { + return KVSTORAGE_ERROR_INTERNAL_ERROR; + } + else if (g_ascii_strncasecmp (buf->data, "SERVER_ERROR", MIN (l, sizeof("SERVER_ERORR") - 1)) == 0) { + return KVSTORAGE_ERROR_SERVER_ERROR; + } + else if (g_ascii_strncasecmp (buf->data, "CLIENT_ERROR", MIN (l, sizeof("CLIENT_ERORR") - 1)) == 0) { + return KVSTORAGE_ERROR_CLIENT_ERROR; + } + else if (g_ascii_strncasecmp (buf->data, "NOT_STORED", MIN (l, sizeof("NOT_STORED") - 1)) == 0) { + return KVSTORAGE_ERROR_NOT_STORED; + } + else if (g_ascii_strncasecmp (buf->data, "NOT_FOUND", MIN (l, sizeof("NOT_FOUND") - 1)) == 0) { + return KVSTORAGE_ERROR_NOT_FOUND; + } + else if (g_ascii_strncasecmp (buf->data, "EXISTS", MIN (l, sizeof("EXISTS") - 1)) == 0) { + return KVSTORAGE_ERROR_EXISTS; + } + else if (g_ascii_strncasecmp (buf->data, "STORED", MIN (l, sizeof("STORED") - 1)) == 0) { + return KVSTORAGE_ERROR_OK; + } + else if (g_ascii_strncasecmp (buf->data, "DELETED", MIN (l, sizeof("DELETED") - 1)) == 0) { + return KVSTORAGE_ERROR_OK; + } + + return KVSTORAGE_ERROR_INTERNAL_ERROR; +} + +/* + * Parse reply line, store element length + */ +static enum rspamd_kvstorage_error +rspamd_kvstorage_parse_get_line (struct kvstorage_buf *buf, guint *len, guint *flags) +{ + guint8 *p, *c; + gboolean error = TRUE; + gchar *err_str; + + p = buf->data; + while (p - buf->data < buf->pos) { + if (g_ascii_isspace (*p)) { + error = FALSE; + while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { + p ++; + } + break; + } + p ++; + } + /* Here we got a word or error flag */ + if (error) { + /* Something wrong here */ + return KVSTORAGE_ERROR_SERVER_ERROR; + } + if (g_ascii_strncasecmp (buf->data, "VALUE", sizeof ("VALUE") - 1) != 0) { + return rspamd_kvstorage_parse_reply_error (buf); + } + /* Here we got key, flags and size items */ + /* Skip key */ + error = TRUE; + while (p - buf->data < buf->pos) { + if (g_ascii_isspace (*p)) { + error = FALSE; + while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { + p ++; + } + break; + } + p ++; + } + if (error) { + /* Something wrong here */ + return KVSTORAGE_ERROR_SERVER_ERROR; + } + /* Read flags */ + c = p; + error = TRUE; + while (p - buf->data < buf->pos) { + if (g_ascii_isspace (*p)) { + error = FALSE; + while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { + p ++; + } + break; + } + else if (!g_ascii_isdigit (*p)) { + break; + } + p ++; + } + if (error) { + /* Something wrong here */ + return KVSTORAGE_ERROR_SERVER_ERROR; + } + *flags = strtoul (c, &err_str, 10); + if (!g_ascii_isspace (*err_str)) { + return KVSTORAGE_ERROR_SERVER_ERROR; + } + /* Read len */ + c = p; + error = TRUE; + while (p - buf->data < buf->pos) { + if (g_ascii_isspace (*p)) { + error = FALSE; + while (p - buf->data < buf->pos && g_ascii_isspace (*p)) { + p ++; + } + break; + } + else if (!g_ascii_isdigit (*p)) { + break; + } + p ++; + } + if (error) { + /* Something wrong here */ + return KVSTORAGE_ERROR_SERVER_ERROR; + } + *len = strtoul (c, &err_str, 10); + if (!g_ascii_isspace (*err_str)) { + return KVSTORAGE_ERROR_SERVER_ERROR; + } + + return KVSTORAGE_ERROR_OK; +} + /* Callbacks for async API */ static void rspamd_kvstorage_connect_cb (int fd, short what, gpointer ud) @@ -284,7 +560,7 @@ rspamd_kvstorage_connect_sync (const gchar *host, /* Set fields */ new->sock = sock; - new->state = KV_STATE_NONE; + new->state = KV_STATE_CONNECTED; new->asynced = FALSE; if (tv != NULL) { memcpy (&new->tv, tv, sizeof (struct timeval)); @@ -306,8 +582,57 @@ rspamd_kvstorage_connect_sync (const gchar *host, */ enum rspamd_kvstorage_error rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn, - const gpointer key, gpointer **value) + const gpointer key, gpointer **value, guint *len) { + struct kvstorage_buf *buf, *databuf; + gint r; + guint flags; + + if (conn == NULL || conn->state != KV_STATE_CONNECTED) { + return KVSTORAGE_ERROR_INTERNAL_ERROR; + } + + buf = rspamd_kvstorage_buf_create (MAX_KV_LINE, conn->pool); + + r = rspamd_snprintf (buf->data, buf->len, "get %s" CRLF, key); + buf->len = r; + while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) { + poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT); + } + + if (r == -1) { + return KVSTORAGE_ERROR_INTERNAL_ERROR; + } + /* Now read reply and try to parse line */ + buf->len = MAX_KV_LINE; + buf->pos = 0; + while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) { + poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN); + } + /* A line was read */ + if (r == 0) { + if ((r = rspamd_kvstorage_parse_get_line (buf, len, &flags)) != KVSTORAGE_ERROR_OK) { + return r; + } + rspamd_kvstorage_buf_drainline (buf); + /* Now allocate and read the data */ + databuf = rspamd_kvstorage_buf_create (*len, conn->pool); + memcpy (databuf->data, buf->data, buf->pos); + while ((r = rspamd_kvstorage_buf_readall (databuf, conn)) > 0) { + poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN); + } + if (r == -1) { + return KVSTORAGE_ERROR_INTERNAL_ERROR; + } + /* Now we have data inside buffer, read the last line */ + buf->pos = 0; + while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) { + poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN); + } + *value = (gpointer)buf->data; + } + + return KVSTORAGE_ERROR_OK; } @@ -321,7 +646,35 @@ enum rspamd_kvstorage_error rspamd_kvstorage_set_sync (struct rspamd_kvstorage_connection *conn, const gpointer key, const gpointer value, gsize len, guint expire) { - return KVSTORAGE_ERROR_OK; + struct kvstorage_buf *buf; + gint r, keylen, buflen; + + if (conn == NULL || conn->state != KV_STATE_CONNECTED) { + return KVSTORAGE_ERROR_INTERNAL_ERROR; + } + + /* Create buf */ + keylen = strlen (key); + buflen = len + keylen + sizeof ("set 4294967296 4294967296 4294967296" CRLF); + buf = rspamd_kvstorage_buf_create (buflen, conn->pool); + + r = rspamd_snprintf (buf->data, buf->len, "set %*s %ud %ud %ud" CRLF "%*s", + keylen, key, 0, expire, len, len, value); + buf->len = r; + while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) { + poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT); + } + if (r == -1) { + return KVSTORAGE_ERROR_INTERNAL_ERROR; + } + /* Now we can read reply */ + buf->pos = 0; + buf->len = buflen; + while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) { + poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN); + } + + return rspamd_kvstorage_parse_reply_error (buf); } /** @@ -333,7 +686,35 @@ enum rspamd_kvstorage_error rspamd_kvstorage_delete_sync (struct rspamd_kvstorage_connection *conn, const gpointer key) { - return KVSTORAGE_ERROR_OK; + struct kvstorage_buf *buf; + gint r, keylen, buflen; + + if (conn == NULL || conn->state != KV_STATE_CONNECTED) { + return KVSTORAGE_ERROR_INTERNAL_ERROR; + } + + /* Create buf */ + keylen = strlen (key); + buflen = MAX (keylen + sizeof ("delete" CRLF), MAX_KV_LINE); + buf = rspamd_kvstorage_buf_create (buflen, conn->pool); + + r = rspamd_snprintf (buf->data, buf->len, "delete %*s" CRLF, + keylen, key); + buf->len = r; + while ((r = rspamd_kvstorage_buf_writeall (buf, conn)) > 0) { + poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_OUT); + } + if (r == -1) { + return KVSTORAGE_ERROR_INTERNAL_ERROR; + } + /* Now we can read reply */ + buf->len = buflen; + buf->pos = 0; + while ((r = rspamd_kvstorage_buf_readline (buf, conn)) == EAGAIN) { + poll_sync_socket (conn->sock, tv_to_msec (&conn->tv), POLL_IN); + } + + return rspamd_kvstorage_parse_reply_error (buf); } /** diff --git a/lib/kvstorage/libkvstorageclient.h b/lib/kvstorage/libkvstorageclient.h index 788303adc..262c5e89e 100644 --- a/lib/kvstorage/libkvstorageclient.h +++ b/lib/kvstorage/libkvstorageclient.h @@ -31,9 +31,11 @@ enum rspamd_kvstorage_error { KVSTORAGE_ERROR_OK = 0, KVSTORAGE_ERROR_TIMEOUT, - KVSTORAGE_ERROR_NOT_EXIST, + KVSTORAGE_ERROR_NOT_FOUND, KVSTORAGE_ERROR_NOT_STORED, + KVSTORAGE_ERROR_EXISTS, KVSTORAGE_ERROR_SERVER_ERROR, + KVSTORAGE_ERROR_CLIENT_ERROR, KVSTORAGE_ERROR_INTERNAL_ERROR }; @@ -120,7 +122,7 @@ enum rspamd_kvstorage_error rspamd_kvstorage_connect_sync (const gchar *host, * @param value value readed */ enum rspamd_kvstorage_error rspamd_kvstorage_get_sync (struct rspamd_kvstorage_connection *conn, - const gpointer key, gpointer **value); + const gpointer key, gpointer **value, guint *len); /** * Write key synced diff --git a/src/util.h b/src/util.h index 7bdf3bdc4..0fb6f48bf 100644 --- a/src/util.h +++ b/src/util.h @@ -226,6 +226,7 @@ gchar * escape_braces_addr_fstr (memory_pool_t *pool, f_str_t *in); * Convert milliseconds to timeval fields */ #define msec_to_tv(msec, tv) do { (tv)->tv_sec = (msec) / 1000; (tv)->tv_usec = ((msec) - (tv)->tv_sec * 1000) * 1000; } while(0) +#define tv_to_msec(tv) (tv)->tv_sec * 1000 + (tv)->tv_usec / 1000 struct worker_task; struct rspamd_worker; |