aboutsummaryrefslogtreecommitdiffstats
path: root/memcached.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2008-06-19 18:55:56 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2008-06-19 18:55:56 +0400
commitdaca72b007e666fb9abebcfa7c27ed6ddcadfd58 (patch)
treeed9ce40de8c2aead8d2d542651600914a3969de7 /memcached.c
parente92bfae6a160b187f47092074c7f49989f71950d (diff)
downloadrspamd-daca72b007e666fb9abebcfa7c27ed6ddcadfd58.tar.gz
rspamd-daca72b007e666fb9abebcfa7c27ed6ddcadfd58.zip
* Rewrite memcached library to work with events (async model)
* Add simple test for new memcached library * Use glib variants of malloc and free functions in rspamd
Diffstat (limited to 'memcached.c')
-rw-r--r--memcached.c726
1 files changed, 391 insertions, 335 deletions
diff --git a/memcached.c b/memcached.c
index 91aa21fdb..566fe659f 100644
--- a/memcached.c
+++ b/memcached.c
@@ -21,6 +21,7 @@
#include <errno.h>
#include <fcntl.h>
#include <sys/uio.h>
+#include <event.h>
#include "memcached.h"
@@ -46,49 +47,381 @@ struct memc_udp_header
uint16_t unused;
};
+static void socket_callback (int fd, short what, void *arg);
+static int memc_parse_header (char *buf, size_t *len, char **end);
+
/*
- * Poll file descriptor for read or write during specified timeout
+ * Write to syslog if OPT_DEBUG is specified
*/
-static int
-poll_d (int fd, u_char want_read, u_char want_write, int timeout)
+static void
+memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...)
+{
+ va_list args;
+ if (ctx->options & MEMC_OPT_DEBUG) {
+ va_start (args, fmt);
+ syslog (LOG_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port));
+ vsyslog (LOG_DEBUG, fmt, args);
+ va_end (args);
+ }
+}
+
+/*
+ * Callback for write command
+ */
+static void
+write_handler (int fd, short what, memcached_ctx_t *ctx)
+{
+ char read_buf[READ_BUFSIZ];
+ int retries;
+ ssize_t r;
+ struct memc_udp_header header;
+ struct iovec iov[4];
+
+ /* Write something to memcached */
+ if (what == EV_WRITE) {
+ if (ctx->protocol == UDP_TEXT) {
+ /* Send udp header */
+ bzero (&header, sizeof (header));
+ header.dg_sent = htons (1);
+ header.req_id = ctx->count;
+ }
+
+ r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, ctx->cmd, ctx->param->key, ctx->param->expire, ctx->param->bufsize);
+ memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf);
+
+ if (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ if (ctx->param->bufpos == 0) {
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = r;
+ }
+ else {
+ iov[1].iov_base = NULL;
+ iov[1].iov_len = 0;
+ }
+ iov[2].iov_base = ctx->param->buf + ctx->param->bufpos;
+ iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos;
+ iov[3].iov_base = CRLF;
+ iov[3].iov_len = sizeof (CRLF) - 1;
+ ctx->param->bufpos = writev (ctx->sock, iov, 4);
+ }
+ else {
+ iov[0].iov_base = read_buf;
+ iov[0].iov_len = r;
+ iov[1].iov_base = ctx->param->buf + ctx->param->bufpos;
+ iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos;
+ iov[2].iov_base = CRLF;
+ iov[2].iov_len = sizeof (CRLF) - 1;
+ ctx->param->bufpos = writev (ctx->sock, iov, 3);
+ }
+ if (ctx->param->bufpos == ctx->param->bufsize) {
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ else {
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ }
+ else if (what == EV_READ) {
+ /* Read header */
+ retries = 0;
+ while (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = READ_BUFSIZ;
+ if ((r = readv (ctx->sock, iov, 2)) == -1) {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ event_del (&ctx->mem_ev);
+ }
+ if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+ retries ++;
+ /* Not our reply packet */
+ continue;
+ }
+ break;
+ }
+ if (ctx->protocol != UDP_TEXT) {
+ r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+ }
+ /* Increment count */
+ ctx->count++;
+ if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, OK, ctx->callback_data);
+ }
+ else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, CLIENT_ERROR, ctx->callback_data);
+ }
+ else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, EXISTS, ctx->callback_data);
+ }
+ else {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ }
+ event_del (&ctx->mem_ev);
+ }
+ else if (what == EV_TIMEOUT) {
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ event_del (&ctx->mem_ev);
+ }
+}
+
+/*
+ * Callback for read command
+ */
+static void
+read_handler (int fd, short what, memcached_ctx_t *ctx)
{
- int r;
- struct pollfd fds[1];
+ char read_buf[READ_BUFSIZ];
+ char *p;
+ ssize_t r;
+ size_t datalen;
+ struct memc_udp_header header;
+ struct iovec iov[2];
+ int retries = 0;
- fds->fd = fd;
- fds->revents = 0;
- fds->events = 0;
+ if (what == EV_WRITE) {
+ /* Send command to memcached */
+ if (ctx->protocol == UDP_TEXT) {
+ /* Send udp header */
+ bzero (&header, sizeof (header));
+ header.dg_sent = htons (1);
+ header.req_id = ctx->count;
+ }
- if (want_read != 0) {
- fds->events |= POLLIN;
+ r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, ctx->cmd, ctx->param->key);
+ memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf);
+ if (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = r;
+ writev (ctx->sock, iov, 2);
+ }
+ else {
+ write (ctx->sock, read_buf, r);
+ }
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ else if (what == EV_READ) {
+ while (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = READ_BUFSIZ;
+ if ((r = readv (ctx->sock, iov, 2)) == -1) {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ }
+ memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf);
+ if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+ memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count);
+ retries++;
+ /* Not our reply packet */
+ continue;
+ }
+ break;
+ }
+ if (ctx->protocol != UDP_TEXT) {
+ r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+ }
+
+ if (r > 0) {
+ read_buf[r] = 0;
+ if (ctx->param->bufpos == 0) {
+ r = memc_parse_header (read_buf, &datalen, &p);
+ if (r < 0) {
+ memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply");
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ goto cleanup;
+ }
+ else if (r == 0) {
+ memc_log (ctx, __LINE__, "memc_read: record does not exists");
+ ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
+ goto cleanup;
+ }
+
+ if (datalen > ctx->param->bufsize) {
+ memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen);
+ ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data);
+ goto cleanup;
+ }
+ /* Check if we already have all data in buffer */
+ if (r >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) {
+ /* Store all data in param's buffer */
+ memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen);
+ /* Increment count */
+ ctx->count++;
+ ctx->callback (ctx, OK, ctx->callback_data);
+ goto cleanup;
+ }
+ /* Subtract from sum parsed header's length */
+ r -= p - read_buf;
+ }
+ else {
+ p = read_buf;
+ }
+
+ if (strncmp (ctx->param->buf + ctx->param->bufpos + r - sizeof (END_TRAILER) - sizeof (CRLF) + 2,
+ END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
+ r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2;
+ memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
+ ctx->callback (ctx, OK, ctx->callback_data);
+ goto cleanup;
+ }
+ /* Store this part of data in param's buffer */
+ memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
+ ctx->param->bufpos += r;
+ }
+ else {
+ memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r);
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ goto cleanup;
+ }
+
+ ctx->count++;
}
- if (want_write != 0) {
- fds->events |= POLLOUT;
+ else if (what == EV_TIMEOUT) {
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ goto cleanup;
}
- while ((r = poll(fds, 1, timeout)) < 0) {
- if (errno != EINTR)
- break;
- }
+
+ return;
- return r;
+cleanup:
+ event_del (&ctx->mem_ev);
}
/*
- * Write to syslog if OPT_DEBUG is specified
+ * Callback for delete command
*/
static void
-memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...)
+delete_handler (int fd, short what, memcached_ctx_t *ctx)
{
- va_list args;
- if (ctx->options & MEMC_OPT_DEBUG) {
- va_start (args, fmt);
- syslog (LOG_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port));
- vsyslog (LOG_DEBUG, fmt, args);
- va_end (args);
+ char read_buf[READ_BUFSIZ];
+ int retries;
+ ssize_t r;
+ struct memc_udp_header header;
+ struct iovec iov[2];
+
+ /* Write something to memcached */
+ if (what == EV_WRITE) {
+ if (ctx->protocol == UDP_TEXT) {
+ /* Send udp header */
+ bzero (&header, sizeof (header));
+ header.dg_sent = htons (1);
+ header.req_id = ctx->count;
+ }
+ r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, ctx->param->key);
+ memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf);
+
+ if (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = r;
+ ctx->param->bufpos = writev (ctx->sock, iov, 2);
+ }
+ else {
+ write (ctx->sock, read_buf, r);
+ }
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ else if (what == EV_READ) {
+ /* Read header */
+ retries = 0;
+ while (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = READ_BUFSIZ;
+ if ((r = readv (ctx->sock, iov, 2)) == -1) {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ event_del (&ctx->mem_ev);
+ }
+ if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+ retries ++;
+ /* Not our reply packet */
+ continue;
+ }
+ break;
+ }
+ if (ctx->protocol != UDP_TEXT) {
+ r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+ }
+ /* Increment count */
+ ctx->count++;
+ if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, OK, ctx->callback_data);
+ }
+ else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
+ }
+ else {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ }
+ event_del (&ctx->mem_ev);
+ }
+ else if (what == EV_TIMEOUT) {
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ event_del (&ctx->mem_ev);
}
}
/*
+ * Callback for our socket events
+ */
+static void
+socket_callback (int fd, short what, void *arg)
+{
+ memcached_ctx_t *ctx = (memcached_ctx_t *)arg;
+
+ switch (ctx->op) {
+ case CMD_NULL:
+ /* Do nothing here */
+ break;
+ case CMD_CONNECT:
+ /* We have write readiness after connect call, so reinit event */
+ ctx->cmd = "connect";
+ if (what == EV_WRITE) {
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, NULL);
+ ctx->callback (ctx, OK, ctx->callback_data);
+ }
+ else {
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ }
+ break;
+ case CMD_WRITE:
+ write_handler (fd, what, ctx);
+ break;
+ case CMD_READ:
+ read_handler (fd, what, ctx);
+ break;
+ case CMD_DELETE:
+ delete_handler (fd, what, ctx);
+ break;
+ }
+}
+
+/*
+ * Common callback function for memcached operations if no user's callback is specified
+ */
+static void
+common_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+ memc_log (ctx, __LINE__, "common_memc_callback: result of memc command '%s' is '%s'", ctx->cmd, memc_strerror (error));
+}
+
+/*
* Make socket for udp connection
*/
static int
@@ -117,6 +450,9 @@ memc_make_udp_sock (memcached_ctx_t *ctx)
* Call connect to set default destination for datagrams
* May not block
*/
+ ctx->op = CMD_CONNECT;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, NULL);
return connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in));
}
@@ -153,16 +489,10 @@ memc_make_tcp_sock (memcached_ctx_t *ctx)
return -1;
}
}
- /* Get write readiness */
- if (poll_d (ctx->sock, 0, 1, ctx->timeout) == 1) {
- return 0;
- }
- else {
- memc_log (ctx, __LINE__, "memc_make_tcp_sock: poll() timeout");
- close (ctx->sock);
- ctx->sock = -1;
- return -1;
- }
+ ctx->op = CMD_CONNECT;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ return 0;
}
/*
@@ -202,160 +532,21 @@ memc_parse_header (char *buf, size_t *len, char **end)
return -1;
}
+
+
/*
* Common read command handler for memcached
*/
memc_error_t
memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem)
{
- char read_buf[READ_BUFSIZ];
- char *p;
- int i, retries;
- ssize_t r, sum = 0, written = 0;
- size_t datalen;
- struct memc_udp_header header;
- struct iovec iov[2];
-
+ int i;
for (i = 0; i < *nelem; i++) {
- if (ctx->protocol == UDP_TEXT) {
- /* Send udp header */
- bzero (&header, sizeof (header));
- header.dg_sent = htons (1);
- header.req_id = ctx->count;
- }
-
- r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, cmd, params[i].key);
- memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf);
- if (ctx->protocol == UDP_TEXT) {
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = r;
- writev (ctx->sock, iov, 2);
- }
- else {
- write (ctx->sock, read_buf, r);
- }
-
- /* Read reply from server */
- retries = 0;
- while (ctx->protocol == UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- return SERVER_TIMEOUT;
- }
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = READ_BUFSIZ;
- if ((r = readv (ctx->sock, iov, 2)) == -1) {
- return SERVER_ERROR;
- }
- memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf);
- if (header.req_id != ctx->count && retries < MAX_RETRIES) {
- memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count);
- retries++;
- /* Not our reply packet */
- continue;
- }
- break;
- }
- if (ctx->protocol != UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- memc_log (ctx, __LINE__, "memc_read: timeout waiting reply");
- return SERVER_TIMEOUT;
- }
- r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
- }
-
- if (r > 0) {
- sum += r;
- read_buf[r] = 0;
- r = memc_parse_header (read_buf, &datalen, &p);
- if (r < 0) {
- memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply");
- return SERVER_ERROR;
- }
- else if (r == 0) {
- memc_log (ctx, __LINE__, "memc_read: record does not exists");
- return NOT_EXISTS;
- }
-
- if (datalen != params[i].bufsize) {
- memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", params[i].bufsize, datalen);
- return WRONG_LENGTH;
- }
-
- /* Subtract from sum parsed header's length */
- sum -= p - read_buf;
- /* Check if we already have all data in buffer */
- if (sum >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) {
- /* Store all data in param's buffer */
- memcpy (params[i].buf, p, datalen);
- /* Increment count */
- ctx->count++;
- return OK;
- }
- else {
- /* Store this part of data in param's buffer */
- memcpy (params[i].buf, p, sum);
- written += sum;
- }
- }
- else {
- memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r);
- return SERVER_ERROR;
- }
- /* Read data from multiply datagrams */
- p = read_buf;
-
- while (sum < datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) {
- retries = 0;
- while (ctx->protocol == UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- memc_log (ctx, __LINE__, "memc_read: timeout waiting reply");
- return SERVER_TIMEOUT;
- }
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = READ_BUFSIZ;
- if ((r = readv (ctx->sock, iov, 2)) == -1) {
- memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r);
- return SERVER_ERROR;
- }
- if (header.req_id != ctx->count && retries < MAX_RETRIES) {
- memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count);
- retries ++;
- /* Not our reply packet */
- continue;
- }
- }
- if (ctx->protocol != UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- memc_log (ctx, __LINE__, "memc_read: timeout waiting reply");
- return SERVER_TIMEOUT;
- }
- r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
- }
-
- p = read_buf;
- sum += r;
- if (r <= 0) {
- break;
- }
- /* Copy received buffer to result buffer */
- while (r--) {
- /* Break on reading END\r\n */
- if (strncmp (p, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
- break;
- }
- if (written < datalen) {
- params[i].buf[written++] = *p++;
- }
- }
- }
- /* Increment count */
- ctx->count++;
+ ctx->cmd = cmd;
+ ctx->op = CMD_READ;
+ ctx->param = &params[i];
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
}
return OK;
@@ -367,97 +558,13 @@ memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, siz
memc_error_t
memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire)
{
- char read_buf[READ_BUFSIZ];
- int i, retries, ofl;
- ssize_t r;
- struct memc_udp_header header;
- struct iovec iov[4];
-
+ int i;
for (i = 0; i < *nelem; i++) {
- if (ctx->protocol == UDP_TEXT) {
- /* Send udp header */
- bzero (&header, sizeof (header));
- header.dg_sent = htons (1);
- header.req_id = ctx->count;
- }
-
- r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, cmd, params[i].key, expire, params[i].bufsize);
- memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf);
- /* Set socket blocking */
- ofl = fcntl(ctx->sock, F_GETFL, 0);
- fcntl(ctx->sock, F_SETFL, ofl & (~O_NONBLOCK));
-
- if (ctx->protocol == UDP_TEXT) {
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = r;
- iov[2].iov_base = params[i].buf;
- iov[2].iov_len = params[i].bufsize;
- iov[3].iov_base = CRLF;
- iov[3].iov_len = sizeof (CRLF) - 1;
- writev (ctx->sock, iov, 4);
- }
- else {
- iov[0].iov_base = read_buf;
- iov[0].iov_len = r;
- iov[1].iov_base = params[i].buf;
- iov[1].iov_len = params[i].bufsize;
- iov[2].iov_base = CRLF;
- iov[2].iov_len = sizeof (CRLF) - 1;
- writev (ctx->sock, iov, 3);
- }
-
- /* Restore socket mode */
- fcntl(ctx->sock, F_SETFL, ofl);
- /* Read reply from server */
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- memc_log (ctx, __LINE__, "memc_write: server timeout while reading reply");
- return SERVER_ERROR;
- }
- /* Read header */
- retries = 0;
- while (ctx->protocol == UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- memc_log (ctx, __LINE__, "memc_write: timeout waiting reply");
- return SERVER_TIMEOUT;
- }
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = READ_BUFSIZ;
- if ((r = readv (ctx->sock, iov, 2)) == -1) {
- return SERVER_ERROR;
- }
- if (header.req_id != ctx->count && retries < MAX_RETRIES) {
- retries ++;
- /* Not our reply packet */
- continue;
- }
- break;
- }
- if (ctx->protocol != UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- memc_log (ctx, __LINE__, "memc_write: timeout waiting reply");
- return SERVER_TIMEOUT;
- }
- r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
- }
- /* Increment count */
- ctx->count++;
-
- if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
- continue;
- }
- else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) {
- return CLIENT_ERROR;
- }
- else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) {
- return EXISTS;
- }
- else {
- return SERVER_ERROR;
- }
+ ctx->cmd = cmd;
+ ctx->op = CMD_WRITE;
+ ctx->param = &params[i];
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
}
return OK;
@@ -468,71 +575,14 @@ memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, si
memc_error_t
memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem)
{
- char read_buf[READ_BUFSIZ];
- int i, retries;
- ssize_t r;
- struct memc_udp_header header;
- struct iovec iov[2];
+ int i;
for (i = 0; i < *nelem; i++) {
- if (ctx->protocol == UDP_TEXT) {
- /* Send udp header */
- bzero (&header, sizeof (header));
- header.dg_sent = htons(1);
- header.req_id = ctx->count;
- }
-
- r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, params[i].key);
- memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf);
- if (ctx->protocol == UDP_TEXT) {
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = r;
- writev (ctx->sock, iov, 2);
- }
- else {
- write (ctx->sock, read_buf, r);
- }
-
- /* Read reply from server */
- retries = 0;
- while (ctx->protocol == UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- return SERVER_TIMEOUT;
- }
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = READ_BUFSIZ;
- if ((r = readv (ctx->sock, iov, 2)) == -1) {
- return SERVER_ERROR;
- }
- if (header.req_id != ctx->count && retries < MAX_RETRIES) {
- retries ++;
- /* Not our reply packet */
- continue;
- }
- break;
- }
- if (ctx->protocol != UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- return SERVER_TIMEOUT;
- }
- r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
- }
-
- /* Increment count */
- ctx->count++;
- if (strncmp (read_buf, DELETED_TRAILER, sizeof (DELETED_TRAILER) - 1) == 0) {
- continue;
- }
- else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) {
- return NOT_EXISTS;
- }
- else {
- return SERVER_ERROR;
- }
+ ctx->cmd = "delete";
+ ctx->op = CMD_DELETE;
+ ctx->param = &params[i];
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
}
return OK;
@@ -630,6 +680,11 @@ memc_init_ctx (memcached_ctx_t *ctx)
ctx->count = 0;
ctx->alive = 1;
+ ctx->op = CMD_NULL;
+ /* Set default callback */
+ if (ctx->callback == NULL) {
+ ctx->callback = common_memc_callback;
+ }
switch (ctx->protocol) {
case UDP_TEXT:
@@ -675,6 +730,7 @@ int
memc_close_ctx (memcached_ctx_t *ctx)
{
if (ctx != NULL && ctx->sock != -1) {
+ event_del (&ctx->mem_ev);
return close (ctx->sock);
}