diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2008-06-19 18:55:56 +0400 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2008-06-19 18:55:56 +0400 |
commit | daca72b007e666fb9abebcfa7c27ed6ddcadfd58 (patch) | |
tree | ed9ce40de8c2aead8d2d542651600914a3969de7 | |
parent | e92bfae6a160b187f47092074c7f49989f71950d (diff) | |
download | rspamd-daca72b007e666fb9abebcfa7c27ed6ddcadfd58.tar.gz rspamd-daca72b007e666fb9abebcfa7c27ed6ddcadfd58.zip |
* Rewrite memcached library to work with events (async model)
* Add simple test for new memcached library
* Use glib variants of malloc and free functions in rspamd
-rw-r--r-- | cfg_utils.c | 6 | ||||
-rw-r--r-- | main.c | 20 | ||||
-rw-r--r-- | memcached-test.c | 79 | ||||
-rw-r--r-- | memcached.c | 726 | ||||
-rw-r--r-- | memcached.h | 41 | ||||
-rw-r--r-- | util.c | 6 |
6 files changed, 521 insertions, 357 deletions
diff --git a/cfg_utils.c b/cfg_utils.c index d682310b0..277a8d8e6 100644 --- a/cfg_utils.c +++ b/cfg_utils.c @@ -129,13 +129,13 @@ void free_config (struct config_file *cfg) { if (cfg->pid_file) { - free (cfg->pid_file); + g_free (cfg->pid_file); } if (cfg->temp_dir) { - free (cfg->temp_dir); + g_free (cfg->temp_dir); } if (cfg->bind_host) { - free (cfg->bind_host); + g_free (cfg->bind_host); } } @@ -61,11 +61,11 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro FILE *f; struct config_file *tmp_cfg; /* Starting worker process */ - cur = (struct rspamd_worker *)malloc (sizeof (struct rspamd_worker)); + cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker)); if (cur) { /* Reconfig needed */ if (reconfig) { - tmp_cfg = (struct config_file *) malloc (sizeof (struct config_file)); + tmp_cfg = (struct config_file *) g_malloc (sizeof (struct config_file)); if (tmp_cfg) { cfg_file = strdup (rspamd->cfg->cfg_name); bzero (tmp_cfg, sizeof (struct config_file)); @@ -83,7 +83,7 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro } else { free_config (rspamd->cfg); - free (rspamd->cfg); + g_free (rspamd->cfg); rspamd->cfg = tmp_cfg; rspamd->cfg->cfg_name = cfg_file; } @@ -130,9 +130,9 @@ main (int argc, char **argv) FILE *f; pid_t wrk; - rspamd = (struct rspamd_main *)malloc (sizeof (struct rspamd_main)); + rspamd = (struct rspamd_main *)g_malloc (sizeof (struct rspamd_main)); bzero (rspamd, sizeof (struct rspamd_main)); - cfg = (struct config_file *)malloc (sizeof (struct config_file)); + cfg = (struct config_file *)g_malloc (sizeof (struct config_file)); rspamd->cfg = cfg; if (!rspamd || !rspamd->cfg) { fprintf(stderr, "Cannot allocate memory\n"); @@ -209,7 +209,7 @@ main (int argc, char **argv) } } else { - un_addr = (struct sockaddr_un *) malloc (sizeof (struct sockaddr_un)); + un_addr = (struct sockaddr_un *) g_malloc (sizeof (struct sockaddr_un)); if (!un_addr || (listen_sock = make_unix_socket (rspamd->cfg->bind_host, un_addr)) == -1) { msg_err ("main: cannot create unix listen socket. %m"); exit(-errno); @@ -267,7 +267,7 @@ main (int argc, char **argv) /* Fork another worker in replace of dead one */ fork_worker (rspamd, listen_sock, 0, cur->type); } - free (cur); + g_free (cur); } } } @@ -303,7 +303,7 @@ main (int argc, char **argv) waitpid (cur->pid, &res, 0); msg_debug ("main(cleaning): worker process %d terminated", cur->pid); TAILQ_REMOVE(&rspamd->workers, cur, next); - free(cur); + g_free(cur); } msg_info ("main: terminating..."); @@ -314,8 +314,8 @@ main (int argc, char **argv) } free_config (rspamd->cfg); - free (rspamd->cfg); - free (rspamd); + g_free (rspamd->cfg); + g_free (rspamd); return (res); } diff --git a/memcached-test.c b/memcached-test.c new file mode 100644 index 000000000..c258673bd --- /dev/null +++ b/memcached-test.c @@ -0,0 +1,79 @@ +#include <stdio.h> +#include <stdlib.h> +#include <netdb.h> +#include <errno.h> +#include <string.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <event.h> + +#include "upstream.h" +#include "memcached.h" + +#define HOST "127.0.0.1" +#define PORT 11211 + +memcached_param_t cur_param; + +static void +test_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data) +{ + int s; + int r; + int *num = ((int *)data); + printf ("result of memc command '%s' is '%s'\n", ctx->cmd, memc_strerror (error)); + /* Connect */ + if (*num == 0) { + printf ("Setting value to memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf); + s = 1; + r = memc_set (ctx, &cur_param, &s, 60); + (*num)++; + } + else if (*num == 1) { + printf ("Getting value from memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf); + s = 1; + r = memc_get (ctx, &cur_param, &s); + (*num)++; + } + else { + printf ("Got value from memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf); + event_loopexit (NULL); + } +} + + +int +main (int argc, char **argv) +{ + memcached_ctx_t mctx; + char *addr, buf[512]; + int num = 0; + + event_init (); + strcpy (cur_param.key, "testkey"); + strcpy (buf, "test_value"); + cur_param.buf = buf; + cur_param.bufsize = sizeof ("test_value") - 1; + + if (argc == 2) { + addr = argv[1]; + } + else { + addr = HOST; + } + + mctx.protocol = TCP_TEXT; + mctx.timeout.tv_sec = 1; + mctx.timeout.tv_usec = 0; + mctx.port = htons (PORT); + mctx.options = MEMC_OPT_DEBUG; + mctx.callback = test_memc_callback; + /* XXX: it is wrong to use local variable pointer here */ + mctx.callback_data = (void *)# + inet_aton (addr, &mctx.addr); + + memc_init_ctx (&mctx); + + event_loop (0); + return 0; +} diff --git a/memcached.c b/memcached.c index 91aa21fdb..566fe659f 100644 --- a/memcached.c +++ b/memcached.c @@ -21,6 +21,7 @@ #include <errno.h> #include <fcntl.h> #include <sys/uio.h> +#include <event.h> #include "memcached.h" @@ -46,49 +47,381 @@ struct memc_udp_header uint16_t unused; }; +static void socket_callback (int fd, short what, void *arg); +static int memc_parse_header (char *buf, size_t *len, char **end); + /* - * Poll file descriptor for read or write during specified timeout + * Write to syslog if OPT_DEBUG is specified */ -static int -poll_d (int fd, u_char want_read, u_char want_write, int timeout) +static void +memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...) +{ + va_list args; + if (ctx->options & MEMC_OPT_DEBUG) { + va_start (args, fmt); + syslog (LOG_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port)); + vsyslog (LOG_DEBUG, fmt, args); + va_end (args); + } +} + +/* + * Callback for write command + */ +static void +write_handler (int fd, short what, memcached_ctx_t *ctx) +{ + char read_buf[READ_BUFSIZ]; + int retries; + ssize_t r; + struct memc_udp_header header; + struct iovec iov[4]; + + /* Write something to memcached */ + if (what == EV_WRITE) { + if (ctx->protocol == UDP_TEXT) { + /* Send udp header */ + bzero (&header, sizeof (header)); + header.dg_sent = htons (1); + header.req_id = ctx->count; + } + + r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, ctx->cmd, ctx->param->key, ctx->param->expire, ctx->param->bufsize); + memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf); + + if (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + if (ctx->param->bufpos == 0) { + iov[1].iov_base = read_buf; + iov[1].iov_len = r; + } + else { + iov[1].iov_base = NULL; + iov[1].iov_len = 0; + } + iov[2].iov_base = ctx->param->buf + ctx->param->bufpos; + iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos; + iov[3].iov_base = CRLF; + iov[3].iov_len = sizeof (CRLF) - 1; + ctx->param->bufpos = writev (ctx->sock, iov, 4); + } + else { + iov[0].iov_base = read_buf; + iov[0].iov_len = r; + iov[1].iov_base = ctx->param->buf + ctx->param->bufpos; + iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos; + iov[2].iov_base = CRLF; + iov[2].iov_len = sizeof (CRLF) - 1; + ctx->param->bufpos = writev (ctx->sock, iov, 3); + } + if (ctx->param->bufpos == ctx->param->bufsize) { + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + else { + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + } + else if (what == EV_READ) { + /* Read header */ + retries = 0; + while (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = READ_BUFSIZ; + if ((r = readv (ctx->sock, iov, 2)) == -1) { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + event_del (&ctx->mem_ev); + } + if (header.req_id != ctx->count && retries < MAX_RETRIES) { + retries ++; + /* Not our reply packet */ + continue; + } + break; + } + if (ctx->protocol != UDP_TEXT) { + r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); + } + /* Increment count */ + ctx->count++; + if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { + ctx->callback (ctx, OK, ctx->callback_data); + } + else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) { + ctx->callback (ctx, CLIENT_ERROR, ctx->callback_data); + } + else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) { + ctx->callback (ctx, EXISTS, ctx->callback_data); + } + else { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + } + event_del (&ctx->mem_ev); + } + else if (what == EV_TIMEOUT) { + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + event_del (&ctx->mem_ev); + } +} + +/* + * Callback for read command + */ +static void +read_handler (int fd, short what, memcached_ctx_t *ctx) { - int r; - struct pollfd fds[1]; + char read_buf[READ_BUFSIZ]; + char *p; + ssize_t r; + size_t datalen; + struct memc_udp_header header; + struct iovec iov[2]; + int retries = 0; - fds->fd = fd; - fds->revents = 0; - fds->events = 0; + if (what == EV_WRITE) { + /* Send command to memcached */ + if (ctx->protocol == UDP_TEXT) { + /* Send udp header */ + bzero (&header, sizeof (header)); + header.dg_sent = htons (1); + header.req_id = ctx->count; + } - if (want_read != 0) { - fds->events |= POLLIN; + r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, ctx->cmd, ctx->param->key); + memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf); + if (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = r; + writev (ctx->sock, iov, 2); + } + else { + write (ctx->sock, read_buf, r); + } + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + else if (what == EV_READ) { + while (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = READ_BUFSIZ; + if ((r = readv (ctx->sock, iov, 2)) == -1) { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + } + memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf); + if (header.req_id != ctx->count && retries < MAX_RETRIES) { + memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count); + retries++; + /* Not our reply packet */ + continue; + } + break; + } + if (ctx->protocol != UDP_TEXT) { + r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); + } + + if (r > 0) { + read_buf[r] = 0; + if (ctx->param->bufpos == 0) { + r = memc_parse_header (read_buf, &datalen, &p); + if (r < 0) { + memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply"); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + goto cleanup; + } + else if (r == 0) { + memc_log (ctx, __LINE__, "memc_read: record does not exists"); + ctx->callback (ctx, NOT_EXISTS, ctx->callback_data); + goto cleanup; + } + + if (datalen > ctx->param->bufsize) { + memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen); + ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data); + goto cleanup; + } + /* Check if we already have all data in buffer */ + if (r >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) { + /* Store all data in param's buffer */ + memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen); + /* Increment count */ + ctx->count++; + ctx->callback (ctx, OK, ctx->callback_data); + goto cleanup; + } + /* Subtract from sum parsed header's length */ + r -= p - read_buf; + } + else { + p = read_buf; + } + + if (strncmp (ctx->param->buf + ctx->param->bufpos + r - sizeof (END_TRAILER) - sizeof (CRLF) + 2, + END_TRAILER, sizeof (END_TRAILER) - 1) == 0) { + r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2; + memcpy (ctx->param->buf + ctx->param->bufpos, p, r); + ctx->callback (ctx, OK, ctx->callback_data); + goto cleanup; + } + /* Store this part of data in param's buffer */ + memcpy (ctx->param->buf + ctx->param->bufpos, p, r); + ctx->param->bufpos += r; + } + else { + memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + goto cleanup; + } + + ctx->count++; } - if (want_write != 0) { - fds->events |= POLLOUT; + else if (what == EV_TIMEOUT) { + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + goto cleanup; } - while ((r = poll(fds, 1, timeout)) < 0) { - if (errno != EINTR) - break; - } + + return; - return r; +cleanup: + event_del (&ctx->mem_ev); } /* - * Write to syslog if OPT_DEBUG is specified + * Callback for delete command */ static void -memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...) +delete_handler (int fd, short what, memcached_ctx_t *ctx) { - va_list args; - if (ctx->options & MEMC_OPT_DEBUG) { - va_start (args, fmt); - syslog (LOG_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port)); - vsyslog (LOG_DEBUG, fmt, args); - va_end (args); + char read_buf[READ_BUFSIZ]; + int retries; + ssize_t r; + struct memc_udp_header header; + struct iovec iov[2]; + + /* Write something to memcached */ + if (what == EV_WRITE) { + if (ctx->protocol == UDP_TEXT) { + /* Send udp header */ + bzero (&header, sizeof (header)); + header.dg_sent = htons (1); + header.req_id = ctx->count; + } + r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, ctx->param->key); + memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf); + + if (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = r; + ctx->param->bufpos = writev (ctx->sock, iov, 2); + } + else { + write (ctx->sock, read_buf, r); + } + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + else if (what == EV_READ) { + /* Read header */ + retries = 0; + while (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = READ_BUFSIZ; + if ((r = readv (ctx->sock, iov, 2)) == -1) { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + event_del (&ctx->mem_ev); + } + if (header.req_id != ctx->count && retries < MAX_RETRIES) { + retries ++; + /* Not our reply packet */ + continue; + } + break; + } + if (ctx->protocol != UDP_TEXT) { + r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); + } + /* Increment count */ + ctx->count++; + if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { + ctx->callback (ctx, OK, ctx->callback_data); + } + else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) { + ctx->callback (ctx, NOT_EXISTS, ctx->callback_data); + } + else { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + } + event_del (&ctx->mem_ev); + } + else if (what == EV_TIMEOUT) { + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + event_del (&ctx->mem_ev); } } /* + * Callback for our socket events + */ +static void +socket_callback (int fd, short what, void *arg) +{ + memcached_ctx_t *ctx = (memcached_ctx_t *)arg; + + switch (ctx->op) { + case CMD_NULL: + /* Do nothing here */ + break; + case CMD_CONNECT: + /* We have write readiness after connect call, so reinit event */ + ctx->cmd = "connect"; + if (what == EV_WRITE) { + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, NULL); + ctx->callback (ctx, OK, ctx->callback_data); + } + else { + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + } + break; + case CMD_WRITE: + write_handler (fd, what, ctx); + break; + case CMD_READ: + read_handler (fd, what, ctx); + break; + case CMD_DELETE: + delete_handler (fd, what, ctx); + break; + } +} + +/* + * Common callback function for memcached operations if no user's callback is specified + */ +static void +common_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data) +{ + memc_log (ctx, __LINE__, "common_memc_callback: result of memc command '%s' is '%s'", ctx->cmd, memc_strerror (error)); +} + +/* * Make socket for udp connection */ static int @@ -117,6 +450,9 @@ memc_make_udp_sock (memcached_ctx_t *ctx) * Call connect to set default destination for datagrams * May not block */ + ctx->op = CMD_CONNECT; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, NULL); return connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in)); } @@ -153,16 +489,10 @@ memc_make_tcp_sock (memcached_ctx_t *ctx) return -1; } } - /* Get write readiness */ - if (poll_d (ctx->sock, 0, 1, ctx->timeout) == 1) { - return 0; - } - else { - memc_log (ctx, __LINE__, "memc_make_tcp_sock: poll() timeout"); - close (ctx->sock); - ctx->sock = -1; - return -1; - } + ctx->op = CMD_CONNECT; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + return 0; } /* @@ -202,160 +532,21 @@ memc_parse_header (char *buf, size_t *len, char **end) return -1; } + + /* * Common read command handler for memcached */ memc_error_t memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem) { - char read_buf[READ_BUFSIZ]; - char *p; - int i, retries; - ssize_t r, sum = 0, written = 0; - size_t datalen; - struct memc_udp_header header; - struct iovec iov[2]; - + int i; for (i = 0; i < *nelem; i++) { - if (ctx->protocol == UDP_TEXT) { - /* Send udp header */ - bzero (&header, sizeof (header)); - header.dg_sent = htons (1); - header.req_id = ctx->count; - } - - r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, cmd, params[i].key); - memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf); - if (ctx->protocol == UDP_TEXT) { - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = r; - writev (ctx->sock, iov, 2); - } - else { - write (ctx->sock, read_buf, r); - } - - /* Read reply from server */ - retries = 0; - while (ctx->protocol == UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - return SERVER_TIMEOUT; - } - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = READ_BUFSIZ; - if ((r = readv (ctx->sock, iov, 2)) == -1) { - return SERVER_ERROR; - } - memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf); - if (header.req_id != ctx->count && retries < MAX_RETRIES) { - memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count); - retries++; - /* Not our reply packet */ - continue; - } - break; - } - if (ctx->protocol != UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - memc_log (ctx, __LINE__, "memc_read: timeout waiting reply"); - return SERVER_TIMEOUT; - } - r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); - } - - if (r > 0) { - sum += r; - read_buf[r] = 0; - r = memc_parse_header (read_buf, &datalen, &p); - if (r < 0) { - memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply"); - return SERVER_ERROR; - } - else if (r == 0) { - memc_log (ctx, __LINE__, "memc_read: record does not exists"); - return NOT_EXISTS; - } - - if (datalen != params[i].bufsize) { - memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", params[i].bufsize, datalen); - return WRONG_LENGTH; - } - - /* Subtract from sum parsed header's length */ - sum -= p - read_buf; - /* Check if we already have all data in buffer */ - if (sum >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) { - /* Store all data in param's buffer */ - memcpy (params[i].buf, p, datalen); - /* Increment count */ - ctx->count++; - return OK; - } - else { - /* Store this part of data in param's buffer */ - memcpy (params[i].buf, p, sum); - written += sum; - } - } - else { - memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r); - return SERVER_ERROR; - } - /* Read data from multiply datagrams */ - p = read_buf; - - while (sum < datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) { - retries = 0; - while (ctx->protocol == UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - memc_log (ctx, __LINE__, "memc_read: timeout waiting reply"); - return SERVER_TIMEOUT; - } - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = READ_BUFSIZ; - if ((r = readv (ctx->sock, iov, 2)) == -1) { - memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r); - return SERVER_ERROR; - } - if (header.req_id != ctx->count && retries < MAX_RETRIES) { - memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count); - retries ++; - /* Not our reply packet */ - continue; - } - } - if (ctx->protocol != UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - memc_log (ctx, __LINE__, "memc_read: timeout waiting reply"); - return SERVER_TIMEOUT; - } - r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); - } - - p = read_buf; - sum += r; - if (r <= 0) { - break; - } - /* Copy received buffer to result buffer */ - while (r--) { - /* Break on reading END\r\n */ - if (strncmp (p, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) { - break; - } - if (written < datalen) { - params[i].buf[written++] = *p++; - } - } - } - /* Increment count */ - ctx->count++; + ctx->cmd = cmd; + ctx->op = CMD_READ; + ctx->param = ¶ms[i]; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); } return OK; @@ -367,97 +558,13 @@ memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, siz memc_error_t memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire) { - char read_buf[READ_BUFSIZ]; - int i, retries, ofl; - ssize_t r; - struct memc_udp_header header; - struct iovec iov[4]; - + int i; for (i = 0; i < *nelem; i++) { - if (ctx->protocol == UDP_TEXT) { - /* Send udp header */ - bzero (&header, sizeof (header)); - header.dg_sent = htons (1); - header.req_id = ctx->count; - } - - r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, cmd, params[i].key, expire, params[i].bufsize); - memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf); - /* Set socket blocking */ - ofl = fcntl(ctx->sock, F_GETFL, 0); - fcntl(ctx->sock, F_SETFL, ofl & (~O_NONBLOCK)); - - if (ctx->protocol == UDP_TEXT) { - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = r; - iov[2].iov_base = params[i].buf; - iov[2].iov_len = params[i].bufsize; - iov[3].iov_base = CRLF; - iov[3].iov_len = sizeof (CRLF) - 1; - writev (ctx->sock, iov, 4); - } - else { - iov[0].iov_base = read_buf; - iov[0].iov_len = r; - iov[1].iov_base = params[i].buf; - iov[1].iov_len = params[i].bufsize; - iov[2].iov_base = CRLF; - iov[2].iov_len = sizeof (CRLF) - 1; - writev (ctx->sock, iov, 3); - } - - /* Restore socket mode */ - fcntl(ctx->sock, F_SETFL, ofl); - /* Read reply from server */ - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - memc_log (ctx, __LINE__, "memc_write: server timeout while reading reply"); - return SERVER_ERROR; - } - /* Read header */ - retries = 0; - while (ctx->protocol == UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - memc_log (ctx, __LINE__, "memc_write: timeout waiting reply"); - return SERVER_TIMEOUT; - } - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = READ_BUFSIZ; - if ((r = readv (ctx->sock, iov, 2)) == -1) { - return SERVER_ERROR; - } - if (header.req_id != ctx->count && retries < MAX_RETRIES) { - retries ++; - /* Not our reply packet */ - continue; - } - break; - } - if (ctx->protocol != UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - memc_log (ctx, __LINE__, "memc_write: timeout waiting reply"); - return SERVER_TIMEOUT; - } - r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); - } - /* Increment count */ - ctx->count++; - - if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { - continue; - } - else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) { - return CLIENT_ERROR; - } - else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) { - return EXISTS; - } - else { - return SERVER_ERROR; - } + ctx->cmd = cmd; + ctx->op = CMD_WRITE; + ctx->param = ¶ms[i]; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); } return OK; @@ -468,71 +575,14 @@ memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, si memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem) { - char read_buf[READ_BUFSIZ]; - int i, retries; - ssize_t r; - struct memc_udp_header header; - struct iovec iov[2]; + int i; for (i = 0; i < *nelem; i++) { - if (ctx->protocol == UDP_TEXT) { - /* Send udp header */ - bzero (&header, sizeof (header)); - header.dg_sent = htons(1); - header.req_id = ctx->count; - } - - r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, params[i].key); - memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf); - if (ctx->protocol == UDP_TEXT) { - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = r; - writev (ctx->sock, iov, 2); - } - else { - write (ctx->sock, read_buf, r); - } - - /* Read reply from server */ - retries = 0; - while (ctx->protocol == UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - return SERVER_TIMEOUT; - } - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = READ_BUFSIZ; - if ((r = readv (ctx->sock, iov, 2)) == -1) { - return SERVER_ERROR; - } - if (header.req_id != ctx->count && retries < MAX_RETRIES) { - retries ++; - /* Not our reply packet */ - continue; - } - break; - } - if (ctx->protocol != UDP_TEXT) { - if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) { - return SERVER_TIMEOUT; - } - r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); - } - - /* Increment count */ - ctx->count++; - if (strncmp (read_buf, DELETED_TRAILER, sizeof (DELETED_TRAILER) - 1) == 0) { - continue; - } - else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) { - return NOT_EXISTS; - } - else { - return SERVER_ERROR; - } + ctx->cmd = "delete"; + ctx->op = CMD_DELETE; + ctx->param = ¶ms[i]; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); } return OK; @@ -630,6 +680,11 @@ memc_init_ctx (memcached_ctx_t *ctx) ctx->count = 0; ctx->alive = 1; + ctx->op = CMD_NULL; + /* Set default callback */ + if (ctx->callback == NULL) { + ctx->callback = common_memc_callback; + } switch (ctx->protocol) { case UDP_TEXT: @@ -675,6 +730,7 @@ int memc_close_ctx (memcached_ctx_t *ctx) { if (ctx != NULL && ctx->sock != -1) { + event_del (&ctx->mem_ev); return close (ctx->sock); } diff --git a/memcached.h b/memcached.h index c0ae896e3..f3872456d 100644 --- a/memcached.h +++ b/memcached.h @@ -3,11 +3,15 @@ #include <sys/types.h> #include <netinet/in.h> +#include <sys/time.h> +#include <time.h> #define MAXKEYLEN 250 #define MEMC_OPT_DEBUG 0x1 +struct event; + typedef enum memc_error { OK, BAD_COMMAND, @@ -27,26 +31,51 @@ typedef enum memc_proto { TCP_BIN } memc_proto_t; +typedef enum memc_op { + CMD_NULL, + CMD_CONNECT, + CMD_READ, + CMD_WRITE, + CMD_DELETE, +} memc_opt_t; + +typedef struct memcached_param_s { + char key[MAXKEYLEN]; + u_char *buf; + size_t bufsize; + size_t bufpos; + int expire; +} memcached_param_t; + + /* Port must be in network byte order */ typedef struct memcached_ctx_s { memc_proto_t protocol; struct in_addr addr; uint16_t port; int sock; - int timeout; + struct timeval timeout; /* Counter that is used for memcached operations in network byte order */ uint16_t count; /* Flag that signalize that this memcached is alive */ short alive; /* Options that can be specified for memcached connection */ short options; + /* Current operation */ + memc_opt_t op; + /* Event structure */ + struct event mem_ev; + /* Current command */ + const char *cmd; + /* Current param */ + memcached_param_t *param; + /* Callback for current operation */ + void (*callback) (struct memcached_ctx_s *ctx, memc_error_t error, void *data); + /* Data for callback function */ + void *callback_data; } memcached_ctx_t; -typedef struct memcached_param_s { - char key[MAXKEYLEN]; - u_char *buf; - size_t bufsize; -} memcached_param_t; +typedef void (*memcached_callback_t) (memcached_ctx_t *ctx, memc_error_t error, void *data); /* * Initialize connection to memcached server: @@ -196,7 +196,7 @@ pass_signal_worker (struct workq *workers, int signo) void convert_to_lowercase (char *str, unsigned int size) { - while (size --) { + while (size--) { *str = tolower (*str ++); } } @@ -277,7 +277,7 @@ init_title(int argc, char *argv[], char *envp[]) if (!end_of_buffer) return 0; - char **new_environ = malloc ((i + 1) * sizeof (envp[0])); + char **new_environ = g_malloc ((i + 1) * sizeof (envp[0])); if (!new_environ) return 0; @@ -375,7 +375,7 @@ pidfile_open(const char *path, mode_t mode, pid_t *pidptr) int error, fd, len, count; struct timespec rqtp; - pfh = malloc(sizeof(*pfh)); + pfh = g_malloc(sizeof(*pfh)); if (pfh == NULL) return (NULL); |