diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2008-06-19 18:55:56 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2008-06-19 18:55:56 +0400 |
commit | daca72b007e666fb9abebcfa7c27ed6ddcadfd58 (patch) | |
tree | ed9ce40de8c2aead8d2d542651600914a3969de7 /memcached.c | |
parent | e92bfae6a160b187f47092074c7f49989f71950d (diff) | |
download | rspamd-daca72b007e666fb9abebcfa7c27ed6ddcadfd58.tar.gz rspamd-daca72b007e666fb9abebcfa7c27ed6ddcadfd58.zip |
* Rewrite memcached library to work with events (async model)
* Add simple test for new memcached library
* Use glib variants of malloc and free functions in rspamd
Diffstat (limited to 'memcached.c')
-rw-r--r-- | memcached.c | 726 |
1 files changed, 391 insertions, 335 deletions
diff --git a/memcached.c b/memcached.c index 91aa21fdb..566fe659f 100644 --- a/memcached.c +++ b/memcached.c @@ -21,6 +21,7 @@ #include <errno.h> #include <fcntl.h> #include <sys/uio.h> +#include <event.h> #include "memcached.h" @@ -46,49 +47,381 @@ struct memc_udp_header uint16_t unused; }; +static void socket_callback (int fd, short what, void *arg); +static int memc_parse_header (char *buf, size_t *len, char **end); + /* - * Poll file descriptor for read or write during specified timeout + * Write to syslog if OPT_DEBUG is specified */ -static int -poll_d (int fd, u_char want_read, u_char want_write, int timeout) +static void +memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...) +{ + va_list args; + if (ctx->options & MEMC_OPT_DEBUG) { + va_start (args, fmt); + syslog (LOG_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port)); + vsyslog (LOG_DEBUG, fmt, args); + va_end (args); + } +} + +/* + * Callback for write command + */ +static void +write_handler (int fd, short what, memcached_ctx_t *ctx) +{ + char read_buf[READ_BUFSIZ]; + int retries; + ssize_t r; + struct memc_udp_header header; + struct iovec iov[4]; + + /* Write something to memcached */ + if (what == EV_WRITE) { + if (ctx->protocol == UDP_TEXT) { + /* Send udp header */ + bzero (&header, sizeof (header)); + header.dg_sent = htons (1); + header.req_id = ctx->count; + } + + r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, ctx->cmd, ctx->param->key, ctx->param->expire, ctx->param->bufsize); + memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf); + + if (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + if (ctx->param->bufpos == 0) { + iov[1].iov_base = read_buf; + iov[1].iov_len = r; + } + else { + iov[1].iov_base = NULL; + iov[1].iov_len = 0; + } + iov[2].iov_base = ctx->param->buf + ctx->param->bufpos; + iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos; + iov[3].iov_base = CRLF; + iov[3].iov_len = sizeof (CRLF) - 1; + ctx->param->bufpos = writev (ctx->sock, iov, 4); + } + else { + iov[0].iov_base = read_buf; + iov[0].iov_len = r; + iov[1].iov_base = ctx->param->buf + ctx->param->bufpos; + iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos; + iov[2].iov_base = CRLF; + iov[2].iov_len = sizeof (CRLF) - 1; + ctx->param->bufpos = writev (ctx->sock, iov, 3); + } + if (ctx->param->bufpos == ctx->param->bufsize) { + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + else { + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + } + else if (what == EV_READ) { + /* Read header */ + retries = 0; + while (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = READ_BUFSIZ; + if ((r = readv (ctx->sock, iov, 2)) == -1) { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + event_del (&ctx->mem_ev); + } + if (header.req_id != ctx->count && retries < MAX_RETRIES) { + retries ++; + /* Not our reply packet */ + continue; + } + break; + } + if (ctx->protocol != UDP_TEXT) { + r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); + } + /* Increment count */ + ctx->count++; + if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { + ctx->callback (ctx, OK, ctx->callback_data); + } + else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) { + ctx->callback (ctx, CLIENT_ERROR, ctx->callback_data); + } + else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) { + ctx->callback (ctx, EXISTS, ctx->callback_data); + } + else { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + } + event_del (&ctx->mem_ev); + } + else if (what == EV_TIMEOUT) { + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + event_del (&ctx->mem_ev); + } +} + +/* + * Callback for read command + */ +static void +read_handler (int fd, short what, memcached_ctx_t *ctx) { - int r; - struct pollfd fds[1]; + char read_buf[READ_BUFSIZ]; + char *p; + ssize_t r; + size_t datalen; + struct memc_udp_header header; + struct iovec iov[2]; + int retries = 0; - fds->fd = fd; - fds->revents = 0; - fds->events = 0; + if (what == EV_WRITE) { + /* Send command to memcached */ + if (ctx->protocol == UDP_TEXT) { + /* Send udp header */ + bzero (&header, sizeof (header)); + header.dg_sent = htons (1); + header.req_id = ctx->count; + } - if (want_read != 0) { - fds->events |= POLLIN; + r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, ctx->cmd, ctx->param->key); + memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf); + if (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = r; + writev (ctx->sock, iov, 2); + } + else { + write (ctx->sock, read_buf, r); + } + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + else if (what == EV_READ) { + while (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = READ_BUFSIZ; + if ((r = readv (ctx->sock, iov, 2)) == -1) { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + } + memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf); + if (header.req_id != ctx->count && retries < MAX_RETRIES) { + memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count); + retries++; + /* Not our reply packet */ + continue; + } + break; + } + if (ctx->protocol != UDP_TEXT) { + r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); + } + + if (r > 0) { + read_buf[r] = 0; + if (ctx->param->bufpos == 0) { + r = memc_parse_header (read_buf, &datalen, &p); + if (r < 0) { + memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply"); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + goto cleanup; + } + else if (r == 0) { + memc_log (ctx, __LINE__, "memc_read: record does not exists"); + ctx->callback (ctx, NOT_EXISTS, ctx->callback_data); + goto cleanup; + } + + if (datalen > ctx->param->bufsize) { + memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen); + ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data); + goto cleanup; + } + /* Check if we already have all data in buffer */ + if (r >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) { + /* Store all data in param's buffer */ + memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen); + /* Increment count */ + ctx->count++; + ctx->callback (ctx, OK, ctx->callback_data); + goto cleanup; + } + /* Subtract from sum parsed header's length */ + r -= p - read_buf; + } + else { + p = read_buf; + } + + if (strncmp (ctx->param->buf + ctx->param->bufpos + r - sizeof (END_TRAILER) - sizeof (CRLF) + 2, + END_TRAILER, sizeof (END_TRAILER) - 1) == 0) { + r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2; + memcpy (ctx->param->buf + ctx->param->bufpos, p, r); + ctx->callback (ctx, OK, ctx->callback_data); + goto cleanup; + } + /* Store this part of data in param's buffer */ + memcpy (ctx->param->buf + ctx->param->bufpos, p, r); + ctx->param->bufpos += r; + } + else { + memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + goto cleanup; + } + + ctx->count++; } - if (want_write != 0) { - fds->events |= POLLOUT; + else if (what == EV_TIMEOUT) { + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + goto cleanup; } - while ((r = poll(fds, 1, timeout)) < 0) { - if (errno != EINTR) - break; - } + + return; - return r; +cleanup: + event_del (&ctx->mem_ev); } /* - * Write to syslog if OPT_DEBUG is specified + * Callback for delete command */ static void -memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...) +delete_handler (int fd, short what, memcached_ctx_t *ctx) { - va_list args; - if (ctx->options & MEMC_OPT_DEBUG) { - va_start (args, fmt); - syslog (LOG_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port)); - vsyslog (LOG_DEBUG, fmt, args); - va_end (args); + char read_buf[READ_BUFSIZ]; + int retries; + ssize_t r; + struct memc_udp_header header; + struct iovec iov[2]; + + /* Write something to memcached */ + if (what == EV_WRITE) { + if (ctx->protocol == UDP_TEXT) { + /* Send udp header */ + bzero (&header, sizeof (header)); + header.dg_sent = htons (1); + header.req_id = ctx->count; + } + r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, ctx->param->key); + memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf); + + if (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = r; + ctx->param->bufpos = writev (ctx->sock, iov, 2); + } + else { + write (ctx->sock, read_buf, r); + } + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + else if (what == EV_READ) { + /* Read header */ + retries = 0; + while (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = READ_BUFSIZ; + if ((r = readv (ctx->sock, iov, 2)) == -1) { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + event_del (&ctx->mem_ev); + } + if (header.req_id != ctx->count && retries < MAX_RETRIES) { + retries ++; + /* Not our reply packet */ + continue; + } + break; + } + if (ctx->protocol != UDP_TEXT) { + r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); + } + /* Increment count */ + ctx->count++; + if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { + ctx->callback (ctx, OK, ctx->callback_data); + } + else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) { + ctx->callback (ctx, NOT_EXISTS, ctx->callback_data); + } + else { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + } + event_del (&ctx->mem_ev); + } + else if (what == EV_TIMEOUT) { + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + event_del (&ctx->mem_ev); } } /* + * Callback for our socket events + */ +static void +socket_callback (int fd, short what, void *arg) +{ + memcached_ctx_t *ctx = (memcached_ctx_t *)arg; + + switch (ctx->op) { + case CMD_NULL: + /* Do nothing here */ + break; + case CMD_CONNECT: + /* We have write readiness after connect call, so reinit event */ + ctx->cmd = "connect"; + if (what == EV_WRITE) { + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, NULL); + ctx->callback (ctx, OK, ctx->callback_data); + } + else { + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + } + break; + case CMD_WRITE: + write_handler (fd, what, ctx); + break; + case CMD_READ: + read_handler (fd, what, ctx); + break; + case CMD_DELETE: + delete_handler (fd, what, ctx); + break; + } +} + +/* + * Common callback function for memcached operations if no user's callback is specified + */ +static void +common_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data) +{ + memc_log (ctx, __LINE__, "common_memc_callback: result of memc command '%s' is '%s'", ctx->cmd, memc_strerror (error)); +} + +/* * Make socket for udp connection */ static int @@ -117,6 +450,9 @@ memc_make_udp_sock (memcached_ctx_t *ctx) * Call connect to set default destination for datagrams * May not block */ + ctx->op = CMD_CONNECT; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, NULL); return connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in)); } @@ -153,16 +489,10 @@ memc_make_tcp_sock (memcached_ctx_t *ctx) return -1; } } - /* Get write readiness */ - if (poll_d (ctx->sock, 0, 1, ctx->timeout) == 1) { - return 0; - } - else { - memc_log (ctx, __LINE__, "memc_make_tcp_sock: poll() timeout"); - close (ctx->sock); - ctx->sock = -1; - return -1; - } + ctx->op = CMD_CONNECT; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + return 0; } /* @@ -202,160 +532,21 @@ memc_parse_header (char *buf, size_t *len, char **end) return -1; } + + /* * Common read command handler for memcached */ memc_error_t memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem) { - char read_buf[READ_BUFSIZ]; - char *p; - int i, retries; - ssize_t r, sum = 0, written = 0; - size_t datalen; - struct memc_udp_header header; - struct iovec iov[2]; - + int i; for (i = 0; i < *nelem; i++) { - if (ctx->protocol == UDP_TEXT) { - /* Send udp header */ - bzero (&header, sizeof (header)); - header.dg_sent = htons (1); - header.req_id = ctx->count; - } - - r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, cmd, params[i].key); - memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf); - if (ctx->protocol == UDP_TEXT) { - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = r; - writev (ctx->sock, iov, 2); - } - else { - write (ctx->sock, read_buf, r); - } - - /* Read reply from server */ - retries = 0; - while (ctx->protocol == UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - return SERVER_TIMEOUT; - } - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = READ_BUFSIZ; - if ((r = readv (ctx->sock, iov, 2)) == -1) { - return SERVER_ERROR; - } - memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf); - if (header.req_id != ctx->count && retries < MAX_RETRIES) { - memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count); - retries++; - /* Not our reply packet */ - continue; - } - break; - } - if (ctx->protocol != UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - memc_log (ctx, __LINE__, "memc_read: timeout waiting reply"); - return SERVER_TIMEOUT; - } - r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); - } - - if (r > 0) { - sum += r; - read_buf[r] = 0; - r = memc_parse_header (read_buf, &datalen, &p); - if (r < 0) { - memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply"); - return SERVER_ERROR; - } - else if (r == 0) { - memc_log (ctx, __LINE__, "memc_read: record does not exists"); - return NOT_EXISTS; - } - - if (datalen != params[i].bufsize) { - memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", params[i].bufsize, datalen); - return WRONG_LENGTH; - } - - /* Subtract from sum parsed header's length */ - sum -= p - read_buf; - /* Check if we already have all data in buffer */ - if (sum >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) { - /* Store all data in param's buffer */ - memcpy (params[i].buf, p, datalen); - /* Increment count */ - ctx->count++; - return OK; - } - else { - /* Store this part of data in param's buffer */ - memcpy (params[i].buf, p, sum); - written += sum; - } - } - else { - memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r); - return SERVER_ERROR; - } - /* Read data from multiply datagrams */ - p = read_buf; - - while (sum < datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) { - retries = 0; - while (ctx->protocol == UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - memc_log (ctx, __LINE__, "memc_read: timeout waiting reply"); - return SERVER_TIMEOUT; - } - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = READ_BUFSIZ; - if ((r = readv (ctx->sock, iov, 2)) == -1) { - memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r); - return SERVER_ERROR; - } - if (header.req_id != ctx->count && retries < MAX_RETRIES) { - memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count); - retries ++; - /* Not our reply packet */ - continue; - } - } - if (ctx->protocol != UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - memc_log (ctx, __LINE__, "memc_read: timeout waiting reply"); - return SERVER_TIMEOUT; - } - r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); - } - - p = read_buf; - sum += r; - if (r <= 0) { - break; - } - /* Copy received buffer to result buffer */ - while (r--) { - /* Break on reading END\r\n */ - if (strncmp (p, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) { - break; - } - if (written < datalen) { - params[i].buf[written++] = *p++; - } - } - } - /* Increment count */ - ctx->count++; + ctx->cmd = cmd; + ctx->op = CMD_READ; + ctx->param = ¶ms[i]; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); } return OK; @@ -367,97 +558,13 @@ memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, siz memc_error_t memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire) { - char read_buf[READ_BUFSIZ]; - int i, retries, ofl; - ssize_t r; - struct memc_udp_header header; - struct iovec iov[4]; - + int i; for (i = 0; i < *nelem; i++) { - if (ctx->protocol == UDP_TEXT) { - /* Send udp header */ - bzero (&header, sizeof (header)); - header.dg_sent = htons (1); - header.req_id = ctx->count; - } - - r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, cmd, params[i].key, expire, params[i].bufsize); - memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf); - /* Set socket blocking */ - ofl = fcntl(ctx->sock, F_GETFL, 0); - fcntl(ctx->sock, F_SETFL, ofl & (~O_NONBLOCK)); - - if (ctx->protocol == UDP_TEXT) { - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = r; - iov[2].iov_base = params[i].buf; - iov[2].iov_len = params[i].bufsize; - iov[3].iov_base = CRLF; - iov[3].iov_len = sizeof (CRLF) - 1; - writev (ctx->sock, iov, 4); - } - else { - iov[0].iov_base = read_buf; - iov[0].iov_len = r; - iov[1].iov_base = params[i].buf; - iov[1].iov_len = params[i].bufsize; - iov[2].iov_base = CRLF; - iov[2].iov_len = sizeof (CRLF) - 1; - writev (ctx->sock, iov, 3); - } - - /* Restore socket mode */ - fcntl(ctx->sock, F_SETFL, ofl); - /* Read reply from server */ - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - memc_log (ctx, __LINE__, "memc_write: server timeout while reading reply"); - return SERVER_ERROR; - } - /* Read header */ - retries = 0; - while (ctx->protocol == UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - memc_log (ctx, __LINE__, "memc_write: timeout waiting reply"); - return SERVER_TIMEOUT; - } - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = READ_BUFSIZ; - if ((r = readv (ctx->sock, iov, 2)) == -1) { - return SERVER_ERROR; - } - if (header.req_id != ctx->count && retries < MAX_RETRIES) { - retries ++; - /* Not our reply packet */ - continue; - } - break; - } - if (ctx->protocol != UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - memc_log (ctx, __LINE__, "memc_write: timeout waiting reply"); - return SERVER_TIMEOUT; - } - r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); - } - /* Increment count */ - ctx->count++; - - if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { - continue; - } - else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) { - return CLIENT_ERROR; - } - else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) { - return EXISTS; - } - else { - return SERVER_ERROR; - } + ctx->cmd = cmd; + ctx->op = CMD_WRITE; + ctx->param = ¶ms[i]; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); } return OK; @@ -468,71 +575,14 @@ memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, si memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem) { - char read_buf[READ_BUFSIZ]; - int i, retries; - ssize_t r; - struct memc_udp_header header; - struct iovec iov[2]; + int i; for (i = 0; i < *nelem; i++) { - if (ctx->protocol == UDP_TEXT) { - /* Send udp header */ - bzero (&header, sizeof (header)); - header.dg_sent = htons(1); - header.req_id = ctx->count; - } - - r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, params[i].key); - memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf); - if (ctx->protocol == UDP_TEXT) { - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = r; - writev (ctx->sock, iov, 2); - } - else { - write (ctx->sock, read_buf, r); - } - - /* Read reply from server */ - retries = 0; - while (ctx->protocol == UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - return SERVER_TIMEOUT; - } - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = READ_BUFSIZ; - if ((r = readv (ctx->sock, iov, 2)) == -1) { - return SERVER_ERROR; - } - if (header.req_id != ctx->count && retries < MAX_RETRIES) { - retries ++; - /* Not our reply packet */ - continue; - } - break; - } - if (ctx->protocol != UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - return SERVER_TIMEOUT; - } - r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); - } - - /* Increment count */ - ctx->count++; - if (strncmp (read_buf, DELETED_TRAILER, sizeof (DELETED_TRAILER) - 1) == 0) { - continue; - } - else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) { - return NOT_EXISTS; - } - else { - return SERVER_ERROR; - } + ctx->cmd = "delete"; + ctx->op = CMD_DELETE; + ctx->param = ¶ms[i]; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); } return OK; @@ -630,6 +680,11 @@ memc_init_ctx (memcached_ctx_t *ctx) ctx->count = 0; ctx->alive = 1; + ctx->op = CMD_NULL; + /* Set default callback */ + if (ctx->callback == NULL) { + ctx->callback = common_memc_callback; + } switch (ctx->protocol) { case UDP_TEXT: @@ -675,6 +730,7 @@ int memc_close_ctx (memcached_ctx_t *ctx) { if (ctx != NULL && ctx->sock != -1) { + event_del (&ctx->mem_ev); return close (ctx->sock); } |