aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2008-06-19 18:55:56 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2008-06-19 18:55:56 +0400
commitdaca72b007e666fb9abebcfa7c27ed6ddcadfd58 (patch)
treeed9ce40de8c2aead8d2d542651600914a3969de7
parente92bfae6a160b187f47092074c7f49989f71950d (diff)
downloadrspamd-daca72b007e666fb9abebcfa7c27ed6ddcadfd58.tar.gz
rspamd-daca72b007e666fb9abebcfa7c27ed6ddcadfd58.zip
* Rewrite memcached library to work with events (async model)
* Add simple test for new memcached library * Use glib variants of malloc and free functions in rspamd
-rw-r--r--cfg_utils.c6
-rw-r--r--main.c20
-rw-r--r--memcached-test.c79
-rw-r--r--memcached.c726
-rw-r--r--memcached.h41
-rw-r--r--util.c6
6 files changed, 521 insertions, 357 deletions
diff --git a/cfg_utils.c b/cfg_utils.c
index d682310b0..277a8d8e6 100644
--- a/cfg_utils.c
+++ b/cfg_utils.c
@@ -129,13 +129,13 @@ void
free_config (struct config_file *cfg)
{
if (cfg->pid_file) {
- free (cfg->pid_file);
+ g_free (cfg->pid_file);
}
if (cfg->temp_dir) {
- free (cfg->temp_dir);
+ g_free (cfg->temp_dir);
}
if (cfg->bind_host) {
- free (cfg->bind_host);
+ g_free (cfg->bind_host);
}
}
diff --git a/main.c b/main.c
index 8cc906b09..c9aa19dfb 100644
--- a/main.c
+++ b/main.c
@@ -61,11 +61,11 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro
FILE *f;
struct config_file *tmp_cfg;
/* Starting worker process */
- cur = (struct rspamd_worker *)malloc (sizeof (struct rspamd_worker));
+ cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
if (cur) {
/* Reconfig needed */
if (reconfig) {
- tmp_cfg = (struct config_file *) malloc (sizeof (struct config_file));
+ tmp_cfg = (struct config_file *) g_malloc (sizeof (struct config_file));
if (tmp_cfg) {
cfg_file = strdup (rspamd->cfg->cfg_name);
bzero (tmp_cfg, sizeof (struct config_file));
@@ -83,7 +83,7 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro
}
else {
free_config (rspamd->cfg);
- free (rspamd->cfg);
+ g_free (rspamd->cfg);
rspamd->cfg = tmp_cfg;
rspamd->cfg->cfg_name = cfg_file;
}
@@ -130,9 +130,9 @@ main (int argc, char **argv)
FILE *f;
pid_t wrk;
- rspamd = (struct rspamd_main *)malloc (sizeof (struct rspamd_main));
+ rspamd = (struct rspamd_main *)g_malloc (sizeof (struct rspamd_main));
bzero (rspamd, sizeof (struct rspamd_main));
- cfg = (struct config_file *)malloc (sizeof (struct config_file));
+ cfg = (struct config_file *)g_malloc (sizeof (struct config_file));
rspamd->cfg = cfg;
if (!rspamd || !rspamd->cfg) {
fprintf(stderr, "Cannot allocate memory\n");
@@ -209,7 +209,7 @@ main (int argc, char **argv)
}
}
else {
- un_addr = (struct sockaddr_un *) malloc (sizeof (struct sockaddr_un));
+ un_addr = (struct sockaddr_un *) g_malloc (sizeof (struct sockaddr_un));
if (!un_addr || (listen_sock = make_unix_socket (rspamd->cfg->bind_host, un_addr)) == -1) {
msg_err ("main: cannot create unix listen socket. %m");
exit(-errno);
@@ -267,7 +267,7 @@ main (int argc, char **argv)
/* Fork another worker in replace of dead one */
fork_worker (rspamd, listen_sock, 0, cur->type);
}
- free (cur);
+ g_free (cur);
}
}
}
@@ -303,7 +303,7 @@ main (int argc, char **argv)
waitpid (cur->pid, &res, 0);
msg_debug ("main(cleaning): worker process %d terminated", cur->pid);
TAILQ_REMOVE(&rspamd->workers, cur, next);
- free(cur);
+ g_free(cur);
}
msg_info ("main: terminating...");
@@ -314,8 +314,8 @@ main (int argc, char **argv)
}
free_config (rspamd->cfg);
- free (rspamd->cfg);
- free (rspamd);
+ g_free (rspamd->cfg);
+ g_free (rspamd);
return (res);
}
diff --git a/memcached-test.c b/memcached-test.c
new file mode 100644
index 000000000..c258673bd
--- /dev/null
+++ b/memcached-test.c
@@ -0,0 +1,79 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <errno.h>
+#include <string.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <event.h>
+
+#include "upstream.h"
+#include "memcached.h"
+
+#define HOST "127.0.0.1"
+#define PORT 11211
+
+memcached_param_t cur_param;
+
+static void
+test_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+ int s;
+ int r;
+ int *num = ((int *)data);
+ printf ("result of memc command '%s' is '%s'\n", ctx->cmd, memc_strerror (error));
+ /* Connect */
+ if (*num == 0) {
+ printf ("Setting value to memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf);
+ s = 1;
+ r = memc_set (ctx, &cur_param, &s, 60);
+ (*num)++;
+ }
+ else if (*num == 1) {
+ printf ("Getting value from memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf);
+ s = 1;
+ r = memc_get (ctx, &cur_param, &s);
+ (*num)++;
+ }
+ else {
+ printf ("Got value from memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf);
+ event_loopexit (NULL);
+ }
+}
+
+
+int
+main (int argc, char **argv)
+{
+ memcached_ctx_t mctx;
+ char *addr, buf[512];
+ int num = 0;
+
+ event_init ();
+ strcpy (cur_param.key, "testkey");
+ strcpy (buf, "test_value");
+ cur_param.buf = buf;
+ cur_param.bufsize = sizeof ("test_value") - 1;
+
+ if (argc == 2) {
+ addr = argv[1];
+ }
+ else {
+ addr = HOST;
+ }
+
+ mctx.protocol = TCP_TEXT;
+ mctx.timeout.tv_sec = 1;
+ mctx.timeout.tv_usec = 0;
+ mctx.port = htons (PORT);
+ mctx.options = MEMC_OPT_DEBUG;
+ mctx.callback = test_memc_callback;
+ /* XXX: it is wrong to use local variable pointer here */
+ mctx.callback_data = (void *)&num;
+ inet_aton (addr, &mctx.addr);
+
+ memc_init_ctx (&mctx);
+
+ event_loop (0);
+ return 0;
+}
diff --git a/memcached.c b/memcached.c
index 91aa21fdb..566fe659f 100644
--- a/memcached.c
+++ b/memcached.c
@@ -21,6 +21,7 @@
#include <errno.h>
#include <fcntl.h>
#include <sys/uio.h>
+#include <event.h>
#include "memcached.h"
@@ -46,49 +47,381 @@ struct memc_udp_header
uint16_t unused;
};
+static void socket_callback (int fd, short what, void *arg);
+static int memc_parse_header (char *buf, size_t *len, char **end);
+
/*
- * Poll file descriptor for read or write during specified timeout
+ * Write to syslog if OPT_DEBUG is specified
*/
-static int
-poll_d (int fd, u_char want_read, u_char want_write, int timeout)
+static void
+memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...)
+{
+ va_list args;
+ if (ctx->options & MEMC_OPT_DEBUG) {
+ va_start (args, fmt);
+ syslog (LOG_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port));
+ vsyslog (LOG_DEBUG, fmt, args);
+ va_end (args);
+ }
+}
+
+/*
+ * Callback for write command
+ */
+static void
+write_handler (int fd, short what, memcached_ctx_t *ctx)
+{
+ char read_buf[READ_BUFSIZ];
+ int retries;
+ ssize_t r;
+ struct memc_udp_header header;
+ struct iovec iov[4];
+
+ /* Write something to memcached */
+ if (what == EV_WRITE) {
+ if (ctx->protocol == UDP_TEXT) {
+ /* Send udp header */
+ bzero (&header, sizeof (header));
+ header.dg_sent = htons (1);
+ header.req_id = ctx->count;
+ }
+
+ r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, ctx->cmd, ctx->param->key, ctx->param->expire, ctx->param->bufsize);
+ memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf);
+
+ if (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ if (ctx->param->bufpos == 0) {
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = r;
+ }
+ else {
+ iov[1].iov_base = NULL;
+ iov[1].iov_len = 0;
+ }
+ iov[2].iov_base = ctx->param->buf + ctx->param->bufpos;
+ iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos;
+ iov[3].iov_base = CRLF;
+ iov[3].iov_len = sizeof (CRLF) - 1;
+ ctx->param->bufpos = writev (ctx->sock, iov, 4);
+ }
+ else {
+ iov[0].iov_base = read_buf;
+ iov[0].iov_len = r;
+ iov[1].iov_base = ctx->param->buf + ctx->param->bufpos;
+ iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos;
+ iov[2].iov_base = CRLF;
+ iov[2].iov_len = sizeof (CRLF) - 1;
+ ctx->param->bufpos = writev (ctx->sock, iov, 3);
+ }
+ if (ctx->param->bufpos == ctx->param->bufsize) {
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ else {
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ }
+ else if (what == EV_READ) {
+ /* Read header */
+ retries = 0;
+ while (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = READ_BUFSIZ;
+ if ((r = readv (ctx->sock, iov, 2)) == -1) {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ event_del (&ctx->mem_ev);
+ }
+ if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+ retries ++;
+ /* Not our reply packet */
+ continue;
+ }
+ break;
+ }
+ if (ctx->protocol != UDP_TEXT) {
+ r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+ }
+ /* Increment count */
+ ctx->count++;
+ if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, OK, ctx->callback_data);
+ }
+ else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, CLIENT_ERROR, ctx->callback_data);
+ }
+ else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, EXISTS, ctx->callback_data);
+ }
+ else {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ }
+ event_del (&ctx->mem_ev);
+ }
+ else if (what == EV_TIMEOUT) {
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ event_del (&ctx->mem_ev);
+ }
+}
+
+/*
+ * Callback for read command
+ */
+static void
+read_handler (int fd, short what, memcached_ctx_t *ctx)
{
- int r;
- struct pollfd fds[1];
+ char read_buf[READ_BUFSIZ];
+ char *p;
+ ssize_t r;
+ size_t datalen;
+ struct memc_udp_header header;
+ struct iovec iov[2];
+ int retries = 0;
- fds->fd = fd;
- fds->revents = 0;
- fds->events = 0;
+ if (what == EV_WRITE) {
+ /* Send command to memcached */
+ if (ctx->protocol == UDP_TEXT) {
+ /* Send udp header */
+ bzero (&header, sizeof (header));
+ header.dg_sent = htons (1);
+ header.req_id = ctx->count;
+ }
- if (want_read != 0) {
- fds->events |= POLLIN;
+ r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, ctx->cmd, ctx->param->key);
+ memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf);
+ if (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = r;
+ writev (ctx->sock, iov, 2);
+ }
+ else {
+ write (ctx->sock, read_buf, r);
+ }
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ else if (what == EV_READ) {
+ while (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = READ_BUFSIZ;
+ if ((r = readv (ctx->sock, iov, 2)) == -1) {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ }
+ memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf);
+ if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+ memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count);
+ retries++;
+ /* Not our reply packet */
+ continue;
+ }
+ break;
+ }
+ if (ctx->protocol != UDP_TEXT) {
+ r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+ }
+
+ if (r > 0) {
+ read_buf[r] = 0;
+ if (ctx->param->bufpos == 0) {
+ r = memc_parse_header (read_buf, &datalen, &p);
+ if (r < 0) {
+ memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply");
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ goto cleanup;
+ }
+ else if (r == 0) {
+ memc_log (ctx, __LINE__, "memc_read: record does not exists");
+ ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
+ goto cleanup;
+ }
+
+ if (datalen > ctx->param->bufsize) {
+ memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen);
+ ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data);
+ goto cleanup;
+ }
+ /* Check if we already have all data in buffer */
+ if (r >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) {
+ /* Store all data in param's buffer */
+ memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen);
+ /* Increment count */
+ ctx->count++;
+ ctx->callback (ctx, OK, ctx->callback_data);
+ goto cleanup;
+ }
+ /* Subtract from sum parsed header's length */
+ r -= p - read_buf;
+ }
+ else {
+ p = read_buf;
+ }
+
+ if (strncmp (ctx->param->buf + ctx->param->bufpos + r - sizeof (END_TRAILER) - sizeof (CRLF) + 2,
+ END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
+ r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2;
+ memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
+ ctx->callback (ctx, OK, ctx->callback_data);
+ goto cleanup;
+ }
+ /* Store this part of data in param's buffer */
+ memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
+ ctx->param->bufpos += r;
+ }
+ else {
+ memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r);
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ goto cleanup;
+ }
+
+ ctx->count++;
}
- if (want_write != 0) {
- fds->events |= POLLOUT;
+ else if (what == EV_TIMEOUT) {
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ goto cleanup;
}
- while ((r = poll(fds, 1, timeout)) < 0) {
- if (errno != EINTR)
- break;
- }
+
+ return;
- return r;
+cleanup:
+ event_del (&ctx->mem_ev);
}
/*
- * Write to syslog if OPT_DEBUG is specified
+ * Callback for delete command
*/
static void
-memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...)
+delete_handler (int fd, short what, memcached_ctx_t *ctx)
{
- va_list args;
- if (ctx->options & MEMC_OPT_DEBUG) {
- va_start (args, fmt);
- syslog (LOG_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port));
- vsyslog (LOG_DEBUG, fmt, args);
- va_end (args);
+ char read_buf[READ_BUFSIZ];
+ int retries;
+ ssize_t r;
+ struct memc_udp_header header;
+ struct iovec iov[2];
+
+ /* Write something to memcached */
+ if (what == EV_WRITE) {
+ if (ctx->protocol == UDP_TEXT) {
+ /* Send udp header */
+ bzero (&header, sizeof (header));
+ header.dg_sent = htons (1);
+ header.req_id = ctx->count;
+ }
+ r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, ctx->param->key);
+ memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf);
+
+ if (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = r;
+ ctx->param->bufpos = writev (ctx->sock, iov, 2);
+ }
+ else {
+ write (ctx->sock, read_buf, r);
+ }
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ else if (what == EV_READ) {
+ /* Read header */
+ retries = 0;
+ while (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = READ_BUFSIZ;
+ if ((r = readv (ctx->sock, iov, 2)) == -1) {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ event_del (&ctx->mem_ev);
+ }
+ if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+ retries ++;
+ /* Not our reply packet */
+ continue;
+ }
+ break;
+ }
+ if (ctx->protocol != UDP_TEXT) {
+ r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+ }
+ /* Increment count */
+ ctx->count++;
+ if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, OK, ctx->callback_data);
+ }
+ else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
+ }
+ else {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ }
+ event_del (&ctx->mem_ev);
+ }
+ else if (what == EV_TIMEOUT) {
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ event_del (&ctx->mem_ev);
}
}
/*
+ * Callback for our socket events
+ */
+static void
+socket_callback (int fd, short what, void *arg)
+{
+ memcached_ctx_t *ctx = (memcached_ctx_t *)arg;
+
+ switch (ctx->op) {
+ case CMD_NULL:
+ /* Do nothing here */
+ break;
+ case CMD_CONNECT:
+ /* We have write readiness after connect call, so reinit event */
+ ctx->cmd = "connect";
+ if (what == EV_WRITE) {
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, NULL);
+ ctx->callback (ctx, OK, ctx->callback_data);
+ }
+ else {
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ }
+ break;
+ case CMD_WRITE:
+ write_handler (fd, what, ctx);
+ break;
+ case CMD_READ:
+ read_handler (fd, what, ctx);
+ break;
+ case CMD_DELETE:
+ delete_handler (fd, what, ctx);
+ break;
+ }
+}
+
+/*
+ * Common callback function for memcached operations if no user's callback is specified
+ */
+static void
+common_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+ memc_log (ctx, __LINE__, "common_memc_callback: result of memc command '%s' is '%s'", ctx->cmd, memc_strerror (error));
+}
+
+/*
* Make socket for udp connection
*/
static int
@@ -117,6 +450,9 @@ memc_make_udp_sock (memcached_ctx_t *ctx)
* Call connect to set default destination for datagrams
* May not block
*/
+ ctx->op = CMD_CONNECT;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, NULL);
return connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in));
}
@@ -153,16 +489,10 @@ memc_make_tcp_sock (memcached_ctx_t *ctx)
return -1;
}
}
- /* Get write readiness */
- if (poll_d (ctx->sock, 0, 1, ctx->timeout) == 1) {
- return 0;
- }
- else {
- memc_log (ctx, __LINE__, "memc_make_tcp_sock: poll() timeout");
- close (ctx->sock);
- ctx->sock = -1;
- return -1;
- }
+ ctx->op = CMD_CONNECT;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ return 0;
}
/*
@@ -202,160 +532,21 @@ memc_parse_header (char *buf, size_t *len, char **end)
return -1;
}
+
+
/*
* Common read command handler for memcached
*/
memc_error_t
memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem)
{
- char read_buf[READ_BUFSIZ];
- char *p;
- int i, retries;
- ssize_t r, sum = 0, written = 0;
- size_t datalen;
- struct memc_udp_header header;
- struct iovec iov[2];
-
+ int i;
for (i = 0; i < *nelem; i++) {
- if (ctx->protocol == UDP_TEXT) {
- /* Send udp header */
- bzero (&header, sizeof (header));
- header.dg_sent = htons (1);
- header.req_id = ctx->count;
- }
-
- r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, cmd, params[i].key);
- memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf);
- if (ctx->protocol == UDP_TEXT) {
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = r;
- writev (ctx->sock, iov, 2);
- }
- else {
- write (ctx->sock, read_buf, r);
- }
-
- /* Read reply from server */
- retries = 0;
- while (ctx->protocol == UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- return SERVER_TIMEOUT;
- }
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = READ_BUFSIZ;
- if ((r = readv (ctx->sock, iov, 2)) == -1) {
- return SERVER_ERROR;
- }
- memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf);
- if (header.req_id != ctx->count && retries < MAX_RETRIES) {
- memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count);
- retries++;
- /* Not our reply packet */
- continue;
- }
- break;
- }
- if (ctx->protocol != UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- memc_log (ctx, __LINE__, "memc_read: timeout waiting reply");
- return SERVER_TIMEOUT;
- }
- r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
- }
-
- if (r > 0) {
- sum += r;
- read_buf[r] = 0;
- r = memc_parse_header (read_buf, &datalen, &p);
- if (r < 0) {
- memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply");
- return SERVER_ERROR;
- }
- else if (r == 0) {
- memc_log (ctx, __LINE__, "memc_read: record does not exists");
- return NOT_EXISTS;
- }
-
- if (datalen != params[i].bufsize) {
- memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", params[i].bufsize, datalen);
- return WRONG_LENGTH;
- }
-
- /* Subtract from sum parsed header's length */
- sum -= p - read_buf;
- /* Check if we already have all data in buffer */
- if (sum >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) {
- /* Store all data in param's buffer */
- memcpy (params[i].buf, p, datalen);
- /* Increment count */
- ctx->count++;
- return OK;
- }
- else {
- /* Store this part of data in param's buffer */
- memcpy (params[i].buf, p, sum);
- written += sum;
- }
- }
- else {
- memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r);
- return SERVER_ERROR;
- }
- /* Read data from multiply datagrams */
- p = read_buf;
-
- while (sum < datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) {
- retries = 0;
- while (ctx->protocol == UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- memc_log (ctx, __LINE__, "memc_read: timeout waiting reply");
- return SERVER_TIMEOUT;
- }
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = READ_BUFSIZ;
- if ((r = readv (ctx->sock, iov, 2)) == -1) {
- memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r);
- return SERVER_ERROR;
- }
- if (header.req_id != ctx->count && retries < MAX_RETRIES) {
- memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count);
- retries ++;
- /* Not our reply packet */
- continue;
- }
- }
- if (ctx->protocol != UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- memc_log (ctx, __LINE__, "memc_read: timeout waiting reply");
- return SERVER_TIMEOUT;
- }
- r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
- }
-
- p = read_buf;
- sum += r;
- if (r <= 0) {
- break;
- }
- /* Copy received buffer to result buffer */
- while (r--) {
- /* Break on reading END\r\n */
- if (strncmp (p, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
- break;
- }
- if (written < datalen) {
- params[i].buf[written++] = *p++;
- }
- }
- }
- /* Increment count */
- ctx->count++;
+ ctx->cmd = cmd;
+ ctx->op = CMD_READ;
+ ctx->param = &params[i];
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
}
return OK;
@@ -367,97 +558,13 @@ memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, siz
memc_error_t
memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire)
{
- char read_buf[READ_BUFSIZ];
- int i, retries, ofl;
- ssize_t r;
- struct memc_udp_header header;
- struct iovec iov[4];
-
+ int i;
for (i = 0; i < *nelem; i++) {
- if (ctx->protocol == UDP_TEXT) {
- /* Send udp header */
- bzero (&header, sizeof (header));
- header.dg_sent = htons (1);
- header.req_id = ctx->count;
- }
-
- r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, cmd, params[i].key, expire, params[i].bufsize);
- memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf);
- /* Set socket blocking */
- ofl = fcntl(ctx->sock, F_GETFL, 0);
- fcntl(ctx->sock, F_SETFL, ofl & (~O_NONBLOCK));
-
- if (ctx->protocol == UDP_TEXT) {
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = r;
- iov[2].iov_base = params[i].buf;
- iov[2].iov_len = params[i].bufsize;
- iov[3].iov_base = CRLF;
- iov[3].iov_len = sizeof (CRLF) - 1;
- writev (ctx->sock, iov, 4);
- }
- else {
- iov[0].iov_base = read_buf;
- iov[0].iov_len = r;
- iov[1].iov_base = params[i].buf;
- iov[1].iov_len = params[i].bufsize;
- iov[2].iov_base = CRLF;
- iov[2].iov_len = sizeof (CRLF) - 1;
- writev (ctx->sock, iov, 3);
- }
-
- /* Restore socket mode */
- fcntl(ctx->sock, F_SETFL, ofl);
- /* Read reply from server */
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- memc_log (ctx, __LINE__, "memc_write: server timeout while reading reply");
- return SERVER_ERROR;
- }
- /* Read header */
- retries = 0;
- while (ctx->protocol == UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- memc_log (ctx, __LINE__, "memc_write: timeout waiting reply");
- return SERVER_TIMEOUT;
- }
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = READ_BUFSIZ;
- if ((r = readv (ctx->sock, iov, 2)) == -1) {
- return SERVER_ERROR;
- }
- if (header.req_id != ctx->count && retries < MAX_RETRIES) {
- retries ++;
- /* Not our reply packet */
- continue;
- }
- break;
- }
- if (ctx->protocol != UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- memc_log (ctx, __LINE__, "memc_write: timeout waiting reply");
- return SERVER_TIMEOUT;
- }
- r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
- }
- /* Increment count */
- ctx->count++;
-
- if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
- continue;
- }
- else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) {
- return CLIENT_ERROR;
- }
- else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) {
- return EXISTS;
- }
- else {
- return SERVER_ERROR;
- }
+ ctx->cmd = cmd;
+ ctx->op = CMD_WRITE;
+ ctx->param = &params[i];
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
}
return OK;
@@ -468,71 +575,14 @@ memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, si
memc_error_t
memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem)
{
- char read_buf[READ_BUFSIZ];
- int i, retries;
- ssize_t r;
- struct memc_udp_header header;
- struct iovec iov[2];
+ int i;
for (i = 0; i < *nelem; i++) {
- if (ctx->protocol == UDP_TEXT) {
- /* Send udp header */
- bzero (&header, sizeof (header));
- header.dg_sent = htons(1);
- header.req_id = ctx->count;
- }
-
- r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, params[i].key);
- memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf);
- if (ctx->protocol == UDP_TEXT) {
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = r;
- writev (ctx->sock, iov, 2);
- }
- else {
- write (ctx->sock, read_buf, r);
- }
-
- /* Read reply from server */
- retries = 0;
- while (ctx->protocol == UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- return SERVER_TIMEOUT;
- }
- iov[0].iov_base = &header;
- iov[0].iov_len = sizeof (struct memc_udp_header);
- iov[1].iov_base = read_buf;
- iov[1].iov_len = READ_BUFSIZ;
- if ((r = readv (ctx->sock, iov, 2)) == -1) {
- return SERVER_ERROR;
- }
- if (header.req_id != ctx->count && retries < MAX_RETRIES) {
- retries ++;
- /* Not our reply packet */
- continue;
- }
- break;
- }
- if (ctx->protocol != UDP_TEXT) {
- if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
- return SERVER_TIMEOUT;
- }
- r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
- }
-
- /* Increment count */
- ctx->count++;
- if (strncmp (read_buf, DELETED_TRAILER, sizeof (DELETED_TRAILER) - 1) == 0) {
- continue;
- }
- else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) {
- return NOT_EXISTS;
- }
- else {
- return SERVER_ERROR;
- }
+ ctx->cmd = "delete";
+ ctx->op = CMD_DELETE;
+ ctx->param = &params[i];
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
}
return OK;
@@ -630,6 +680,11 @@ memc_init_ctx (memcached_ctx_t *ctx)
ctx->count = 0;
ctx->alive = 1;
+ ctx->op = CMD_NULL;
+ /* Set default callback */
+ if (ctx->callback == NULL) {
+ ctx->callback = common_memc_callback;
+ }
switch (ctx->protocol) {
case UDP_TEXT:
@@ -675,6 +730,7 @@ int
memc_close_ctx (memcached_ctx_t *ctx)
{
if (ctx != NULL && ctx->sock != -1) {
+ event_del (&ctx->mem_ev);
return close (ctx->sock);
}
diff --git a/memcached.h b/memcached.h
index c0ae896e3..f3872456d 100644
--- a/memcached.h
+++ b/memcached.h
@@ -3,11 +3,15 @@
#include <sys/types.h>
#include <netinet/in.h>
+#include <sys/time.h>
+#include <time.h>
#define MAXKEYLEN 250
#define MEMC_OPT_DEBUG 0x1
+struct event;
+
typedef enum memc_error {
OK,
BAD_COMMAND,
@@ -27,26 +31,51 @@ typedef enum memc_proto {
TCP_BIN
} memc_proto_t;
+typedef enum memc_op {
+ CMD_NULL,
+ CMD_CONNECT,
+ CMD_READ,
+ CMD_WRITE,
+ CMD_DELETE,
+} memc_opt_t;
+
+typedef struct memcached_param_s {
+ char key[MAXKEYLEN];
+ u_char *buf;
+ size_t bufsize;
+ size_t bufpos;
+ int expire;
+} memcached_param_t;
+
+
/* Port must be in network byte order */
typedef struct memcached_ctx_s {
memc_proto_t protocol;
struct in_addr addr;
uint16_t port;
int sock;
- int timeout;
+ struct timeval timeout;
/* Counter that is used for memcached operations in network byte order */
uint16_t count;
/* Flag that signalize that this memcached is alive */
short alive;
/* Options that can be specified for memcached connection */
short options;
+ /* Current operation */
+ memc_opt_t op;
+ /* Event structure */
+ struct event mem_ev;
+ /* Current command */
+ const char *cmd;
+ /* Current param */
+ memcached_param_t *param;
+ /* Callback for current operation */
+ void (*callback) (struct memcached_ctx_s *ctx, memc_error_t error, void *data);
+ /* Data for callback function */
+ void *callback_data;
} memcached_ctx_t;
-typedef struct memcached_param_s {
- char key[MAXKEYLEN];
- u_char *buf;
- size_t bufsize;
-} memcached_param_t;
+typedef void (*memcached_callback_t) (memcached_ctx_t *ctx, memc_error_t error, void *data);
/*
* Initialize connection to memcached server:
diff --git a/util.c b/util.c
index 37ceacd44..52591983a 100644
--- a/util.c
+++ b/util.c
@@ -196,7 +196,7 @@ pass_signal_worker (struct workq *workers, int signo)
void convert_to_lowercase (char *str, unsigned int size)
{
- while (size --) {
+ while (size--) {
*str = tolower (*str ++);
}
}
@@ -277,7 +277,7 @@ init_title(int argc, char *argv[], char *envp[])
if (!end_of_buffer)
return 0;
- char **new_environ = malloc ((i + 1) * sizeof (envp[0]));
+ char **new_environ = g_malloc ((i + 1) * sizeof (envp[0]));
if (!new_environ)
return 0;
@@ -375,7 +375,7 @@ pidfile_open(const char *path, mode_t mode, pid_t *pidptr)
int error, fd, len, count;
struct timespec rqtp;
- pfh = malloc(sizeof(*pfh));
+ pfh = g_malloc(sizeof(*pfh));
if (pfh == NULL)
return (NULL);