]> source.dussan.org Git - rspamd.git/commitdiff
* Rewrite memcached library to work with events (async model)
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 19 Jun 2008 14:55:56 +0000 (18:55 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Thu, 19 Jun 2008 14:55:56 +0000 (18:55 +0400)
* Add simple test for new memcached library
* Use glib variants of malloc and free functions in rspamd

cfg_utils.c
main.c
memcached-test.c [new file with mode: 0644]
memcached.c
memcached.h
util.c

index d682310b094d28e54094530746dc87f3d0e82a5c..277a8d8e650d31e3e54b35f805b759937ac978c2 100644 (file)
@@ -129,13 +129,13 @@ void
 free_config (struct config_file *cfg)
 {
        if (cfg->pid_file) {
-               free (cfg->pid_file);
+               g_free (cfg->pid_file);
        }
        if (cfg->temp_dir) {
-               free (cfg->temp_dir);
+               g_free (cfg->temp_dir);
        }
        if (cfg->bind_host) {
-               free (cfg->bind_host);
+               g_free (cfg->bind_host);
        }
 }
 
diff --git a/main.c b/main.c
index 8cc906b09b671168be707f1d9365878ca2ce5fbf..c9aa19dfbc23e92981fa2040239f85c43c9edd2e 100644 (file)
--- a/main.c
+++ b/main.c
@@ -61,11 +61,11 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro
        FILE *f;
        struct config_file *tmp_cfg;
        /* Starting worker process */
-       cur = (struct rspamd_worker *)malloc (sizeof (struct rspamd_worker));
+       cur = (struct rspamd_worker *)g_malloc (sizeof (struct rspamd_worker));
        if (cur) {
                /* Reconfig needed */
                if (reconfig) {
-                       tmp_cfg = (struct config_file *) malloc (sizeof (struct config_file));
+                       tmp_cfg = (struct config_file *) g_malloc (sizeof (struct config_file));
                        if (tmp_cfg) {
                        cfg_file = strdup (rspamd->cfg->cfg_name);
                        bzero (tmp_cfg, sizeof (struct config_file));
@@ -83,7 +83,7 @@ fork_worker (struct rspamd_main *rspamd, int listen_sock, int reconfig, enum pro
                                        }
                                        else {
                                        free_config (rspamd->cfg);
-                                               free (rspamd->cfg);
+                                               g_free (rspamd->cfg);
                                                rspamd->cfg = tmp_cfg;
                                        rspamd->cfg->cfg_name = cfg_file;
                                        }
@@ -130,9 +130,9 @@ main (int argc, char **argv)
        FILE *f;
        pid_t wrk;
 
-       rspamd = (struct rspamd_main *)malloc (sizeof (struct rspamd_main));
+       rspamd = (struct rspamd_main *)g_malloc (sizeof (struct rspamd_main));
        bzero (rspamd, sizeof (struct rspamd_main));
-       cfg = (struct config_file *)malloc (sizeof (struct config_file));
+       cfg = (struct config_file *)g_malloc (sizeof (struct config_file));
        rspamd->cfg = cfg;
        if (!rspamd || !rspamd->cfg) {
                fprintf(stderr, "Cannot allocate memory\n");
@@ -209,7 +209,7 @@ main (int argc, char **argv)
                }
        }
        else {
-               un_addr = (struct sockaddr_un *) malloc (sizeof (struct sockaddr_un));
+               un_addr = (struct sockaddr_un *) g_malloc (sizeof (struct sockaddr_un));
                if (!un_addr || (listen_sock = make_unix_socket (rspamd->cfg->bind_host, un_addr)) == -1) {
                        msg_err ("main: cannot create unix listen socket. %m");
                        exit(-errno);
@@ -267,7 +267,7 @@ main (int argc, char **argv)
                                                /* Fork another worker in replace of dead one */
                                                fork_worker (rspamd, listen_sock, 0, cur->type);
                                        }
-                                       free (cur);
+                                       g_free (cur);
                                }
                        }
                }
@@ -303,7 +303,7 @@ main (int argc, char **argv)
                waitpid (cur->pid, &res, 0);
                msg_debug ("main(cleaning): worker process %d terminated", cur->pid);
                TAILQ_REMOVE(&rspamd->workers, cur, next);
-               free(cur);
+               g_free(cur);
        }
        
        msg_info ("main: terminating...");
@@ -314,8 +314,8 @@ main (int argc, char **argv)
        }
 
        free_config (rspamd->cfg);
-       free (rspamd->cfg);
-       free (rspamd);
+       g_free (rspamd->cfg);
+       g_free (rspamd);
 
        return (res);
 }
diff --git a/memcached-test.c b/memcached-test.c
new file mode 100644 (file)
index 0000000..c258673
--- /dev/null
@@ -0,0 +1,79 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <errno.h>
+#include <string.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <event.h>
+
+#include "upstream.h"
+#include "memcached.h"
+
+#define HOST "127.0.0.1"
+#define PORT 11211
+
+memcached_param_t cur_param;
+
+static void
+test_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{      
+       int s;
+       int r;
+       int *num = ((int *)data);
+       printf ("result of memc command '%s' is '%s'\n", ctx->cmd, memc_strerror (error));
+       /* Connect */
+       if (*num == 0) {
+               printf ("Setting value to memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf);
+               s = 1;
+               r = memc_set (ctx, &cur_param, &s, 60);
+               (*num)++;
+       }
+       else if (*num == 1) {
+               printf ("Getting value from memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf);
+               s = 1;
+               r = memc_get (ctx, &cur_param, &s);
+               (*num)++;
+       }
+       else {
+               printf ("Got value from memcached: %s -> %s\n", cur_param.key, (char *)cur_param.buf);
+               event_loopexit (NULL);
+       }
+}
+
+
+int 
+main (int argc, char **argv)
+{
+       memcached_ctx_t mctx;
+       char *addr, buf[512];
+       int num = 0;
+       
+       event_init ();
+       strcpy (cur_param.key, "testkey");
+       strcpy (buf, "test_value");
+       cur_param.buf = buf;
+       cur_param.bufsize = sizeof ("test_value") - 1;
+
+       if (argc == 2) {
+               addr = argv[1];
+       }
+       else {
+               addr = HOST;
+       }
+       
+       mctx.protocol = TCP_TEXT;
+       mctx.timeout.tv_sec = 1;
+       mctx.timeout.tv_usec = 0;
+       mctx.port = htons (PORT);
+       mctx.options = MEMC_OPT_DEBUG;
+       mctx.callback = test_memc_callback;
+       /* XXX: it is wrong to use local variable pointer here */
+       mctx.callback_data = (void *)&num;
+       inet_aton (addr, &mctx.addr);
+
+       memc_init_ctx (&mctx);
+       
+       event_loop (0);
+       return 0;
+}
index 91aa21fdba9e780e89ff969c44adca32ecb1c3b9..566fe659fcca45b283184cd2b096a52f89ff32f2 100644 (file)
@@ -21,6 +21,7 @@
 #include <errno.h>
 #include <fcntl.h>
 #include <sys/uio.h>
+#include <event.h>
 
 #include "memcached.h"
 
@@ -46,48 +47,380 @@ struct memc_udp_header
        uint16_t unused;
 };
 
+static void socket_callback (int fd, short what, void *arg);
+static int memc_parse_header (char *buf, size_t *len, char **end);
+
 /*
- * Poll file descriptor for read or write during specified timeout
+ * Write to syslog if OPT_DEBUG is specified
  */
-static int
-poll_d (int fd, u_char want_read, u_char want_write, int timeout)
+static void
+memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...) 
+{
+       va_list args;
+       if (ctx->options & MEMC_OPT_DEBUG) {
+               va_start (args, fmt);
+               syslog (LOG_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port));
+               vsyslog (LOG_DEBUG, fmt, args);
+               va_end (args);
+       }
+}
+
+/*
+ * Callback for write command
+ */
+static void
+write_handler (int fd, short what, memcached_ctx_t *ctx)
+{
+       char read_buf[READ_BUFSIZ];
+       int retries;
+       ssize_t r;
+       struct memc_udp_header header;
+       struct iovec iov[4];
+
+       /* Write something to memcached */
+       if (what == EV_WRITE) {
+               if (ctx->protocol == UDP_TEXT) {
+                       /* Send udp header */
+                       bzero (&header, sizeof (header));
+                       header.dg_sent = htons (1);
+                       header.req_id = ctx->count;
+               }
+
+               r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, ctx->cmd, ctx->param->key, ctx->param->expire, ctx->param->bufsize);
+               memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf);
+
+               if (ctx->protocol == UDP_TEXT) {
+                       iov[0].iov_base = &header;
+                       iov[0].iov_len = sizeof (struct memc_udp_header);
+                       if (ctx->param->bufpos == 0) {
+                               iov[1].iov_base = read_buf;
+                               iov[1].iov_len = r;
+                       }
+                       else {
+                               iov[1].iov_base = NULL;
+                               iov[1].iov_len = 0;
+                       }
+                       iov[2].iov_base = ctx->param->buf + ctx->param->bufpos;
+                       iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos;
+                       iov[3].iov_base = CRLF;
+                       iov[3].iov_len = sizeof (CRLF) - 1;
+                       ctx->param->bufpos = writev (ctx->sock, iov, 4);
+               }
+               else {
+                       iov[0].iov_base = read_buf;
+                       iov[0].iov_len = r;
+                       iov[1].iov_base = ctx->param->buf + ctx->param->bufpos;
+                       iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos;
+                       iov[2].iov_base = CRLF;
+                       iov[2].iov_len = sizeof (CRLF) - 1;
+                       ctx->param->bufpos = writev (ctx->sock, iov, 3);        
+               }
+               if (ctx->param->bufpos == ctx->param->bufsize) {
+                       event_del (&ctx->mem_ev);
+                       event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+                       event_add (&ctx->mem_ev, &ctx->timeout);
+               }
+               else {
+                       event_del (&ctx->mem_ev);
+                       event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+                       event_add (&ctx->mem_ev, &ctx->timeout);
+               }
+       }
+       else if (what == EV_READ) {
+               /* Read header */
+               retries = 0;
+               while (ctx->protocol == UDP_TEXT) {
+                       iov[0].iov_base = &header;
+                       iov[0].iov_len = sizeof (struct memc_udp_header);
+                       iov[1].iov_base = read_buf;
+                       iov[1].iov_len = READ_BUFSIZ;
+                       if ((r = readv (ctx->sock, iov, 2)) == -1) {
+                               ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+                               event_del (&ctx->mem_ev);
+                       }
+                       if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+                               retries ++;
+                               /* Not our reply packet */
+                               continue;
+                       }
+                       break;
+               }
+               if (ctx->protocol != UDP_TEXT) {
+                       r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+               }
+               /* Increment count */
+               ctx->count++;
+               if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
+                       ctx->callback (ctx, OK, ctx->callback_data);
+               }
+               else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) {
+                       ctx->callback (ctx, CLIENT_ERROR, ctx->callback_data);
+               }
+               else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) {
+                       ctx->callback (ctx, EXISTS, ctx->callback_data);
+               }
+               else {
+                       ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+               }
+               event_del (&ctx->mem_ev);
+       }
+       else if (what == EV_TIMEOUT) {
+               ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+               event_del (&ctx->mem_ev);
+       }
+}
+
+/*
+ * Callback for read command
+ */
+static void
+read_handler (int fd, short what, memcached_ctx_t *ctx)
 {
-       int r;
-    struct pollfd fds[1];
+       char read_buf[READ_BUFSIZ];
+       char *p;
+       ssize_t r;
+       size_t datalen;
+       struct memc_udp_header header;
+       struct iovec iov[2];
+       int retries = 0;
        
-       fds->fd = fd;
-    fds->revents = 0;
-       fds->events = 0;
+       if (what == EV_WRITE) {
+               /* Send command to memcached */
+               if (ctx->protocol == UDP_TEXT) {
+                       /* Send udp header */
+                       bzero (&header, sizeof (header));
+                       header.dg_sent = htons (1);
+                       header.req_id = ctx->count;
+               }
 
-       if (want_read != 0) {
-               fds->events |= POLLIN;
+               r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, ctx->cmd, ctx->param->key);
+               memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf);
+               if (ctx->protocol == UDP_TEXT) {
+                       iov[0].iov_base = &header;
+                       iov[0].iov_len = sizeof (struct memc_udp_header);
+                       iov[1].iov_base = read_buf;
+                       iov[1].iov_len = r;
+                       writev (ctx->sock, iov, 2);
+               }
+               else {
+                       write (ctx->sock, read_buf, r);
+               }
+               event_del (&ctx->mem_ev);
+               event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+               event_add (&ctx->mem_ev, &ctx->timeout);
+       }
+       else if (what == EV_READ) {
+               while (ctx->protocol == UDP_TEXT) {
+                       iov[0].iov_base = &header;
+                       iov[0].iov_len = sizeof (struct memc_udp_header);
+                       iov[1].iov_base = read_buf;
+                       iov[1].iov_len = READ_BUFSIZ;
+                       if ((r = readv (ctx->sock, iov, 2)) == -1) {
+                               ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+                       }
+                       memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf);
+                       if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+                               memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count);
+                               retries++;
+                               /* Not our reply packet */
+                               continue;
+                       }
+                       break;
+               }
+               if (ctx->protocol != UDP_TEXT) {
+                       r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+               }
+
+               if (r > 0) {
+                       read_buf[r] = 0;
+                       if (ctx->param->bufpos == 0) {
+                               r = memc_parse_header (read_buf, &datalen, &p);
+                               if (r < 0) {
+                                       memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply");
+                                       ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+                                       goto cleanup;
+                               }
+                               else if (r == 0) {
+                                       memc_log (ctx, __LINE__, "memc_read: record does not exists");
+                                       ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
+                                       goto cleanup;
+                               }
+
+                               if (datalen > ctx->param->bufsize) {
+                                       memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen);
+                                       ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data);
+                                       goto cleanup;
+                               }
+                               /* Check if we already have all data in buffer */
+                               if (r >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) {
+                                       /* Store all data in param's buffer */
+                                       memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen);
+                                       /* Increment count */
+                                       ctx->count++;
+                                       ctx->callback (ctx, OK, ctx->callback_data);
+                                       goto cleanup;
+                               }
+                               /* Subtract from sum parsed header's length */
+                               r -= p - read_buf;
+                       }
+                       else {
+                               p = read_buf;
+                       }
+                       
+                       if (strncmp (ctx->param->buf + ctx->param->bufpos + r - sizeof (END_TRAILER) - sizeof (CRLF) + 2, 
+                                               END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
+                               r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2;
+                               memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
+                               ctx->callback (ctx, OK, ctx->callback_data);
+                               goto cleanup;
+                       }
+                       /* Store this part of data in param's buffer */
+                       memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
+                       ctx->param->bufpos += r;
+               }
+               else {
+                       memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r);
+                       ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+                       goto cleanup;
+               }
+
+               ctx->count++;
        }
-       if (want_write != 0) {
-               fds->events |= POLLOUT;
+       else if (what == EV_TIMEOUT) {
+               ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+               goto cleanup;
        }
-       while ((r = poll(fds, 1, timeout)) < 0) {
-               if (errno != EINTR)
-               break;
-    }
+       
+       return;
 
-       return r;
+cleanup:
+       event_del (&ctx->mem_ev);
 }
 
 /*
- * Write to syslog if OPT_DEBUG is specified
+ * Callback for delete command
  */
 static void
-memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...) 
+delete_handler (int fd, short what, memcached_ctx_t *ctx)
 {
-       va_list args;
-       if (ctx->options & MEMC_OPT_DEBUG) {
-               va_start (args, fmt);
-               syslog (LOG_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port));
-               vsyslog (LOG_DEBUG, fmt, args);
-               va_end (args);
+       char read_buf[READ_BUFSIZ];
+       int retries;
+       ssize_t r;
+       struct memc_udp_header header;
+       struct iovec iov[2];
+
+       /* Write something to memcached */
+       if (what == EV_WRITE) {
+               if (ctx->protocol == UDP_TEXT) {
+                       /* Send udp header */
+                       bzero (&header, sizeof (header));
+                       header.dg_sent = htons (1);
+                       header.req_id = ctx->count;
+               }
+               r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, ctx->param->key);
+               memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf);
+
+               if (ctx->protocol == UDP_TEXT) {
+                       iov[0].iov_base = &header;
+                       iov[0].iov_len = sizeof (struct memc_udp_header);
+                       iov[1].iov_base = read_buf;
+                       iov[1].iov_len = r;
+                       ctx->param->bufpos = writev (ctx->sock, iov, 2);
+               }
+               else {
+                       write (ctx->sock, read_buf, r);
+               }
+               event_del (&ctx->mem_ev);
+               event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+               event_add (&ctx->mem_ev, &ctx->timeout);
+       }
+       else if (what == EV_READ) {
+               /* Read header */
+               retries = 0;
+               while (ctx->protocol == UDP_TEXT) {
+                       iov[0].iov_base = &header;
+                       iov[0].iov_len = sizeof (struct memc_udp_header);
+                       iov[1].iov_base = read_buf;
+                       iov[1].iov_len = READ_BUFSIZ;
+                       if ((r = readv (ctx->sock, iov, 2)) == -1) {
+                               ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+                               event_del (&ctx->mem_ev);
+                       }
+                       if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+                               retries ++;
+                               /* Not our reply packet */
+                               continue;
+                       }
+                       break;
+               }
+               if (ctx->protocol != UDP_TEXT) {
+                       r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+               }
+               /* Increment count */
+               ctx->count++;
+               if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
+                       ctx->callback (ctx, OK, ctx->callback_data);
+               }
+               else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) {
+                       ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
+               }
+               else {
+                       ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+               }
+               event_del (&ctx->mem_ev);
+       }
+       else if (what == EV_TIMEOUT) {
+               ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+               event_del (&ctx->mem_ev);
        }
 }
 
+/*
+ * Callback for our socket events
+ */
+static void
+socket_callback (int fd, short what, void *arg)
+{
+       memcached_ctx_t *ctx = (memcached_ctx_t *)arg;
+
+       switch (ctx->op) {
+               case CMD_NULL:
+                       /* Do nothing here */
+                       break;
+               case CMD_CONNECT:
+                       /* We have write readiness after connect call, so reinit event */
+                       ctx->cmd = "connect";
+                       if (what == EV_WRITE) {
+                               event_del (&ctx->mem_ev);
+                               event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+                               event_add (&ctx->mem_ev, NULL);
+                               ctx->callback (ctx, OK, ctx->callback_data);
+                       }
+                       else {
+                               ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+                       }
+                       break;
+               case CMD_WRITE:
+                       write_handler (fd, what, ctx);
+                       break;
+               case CMD_READ:
+                       read_handler (fd, what, ctx);
+                       break;
+               case CMD_DELETE:
+                       delete_handler (fd, what, ctx);
+                       break;
+       }
+}
+
+/*
+ * Common callback function for memcached operations if no user's callback is specified
+ */
+static void
+common_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+       memc_log (ctx, __LINE__, "common_memc_callback: result of memc command '%s' is '%s'", ctx->cmd, memc_strerror (error));
+}
+
 /*
  * Make socket for udp connection
  */
@@ -117,6 +450,9 @@ memc_make_udp_sock (memcached_ctx_t *ctx)
         * Call connect to set default destination for datagrams 
         * May not block
         */
+       ctx->op = CMD_CONNECT;
+       event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+       event_add (&ctx->mem_ev, NULL);
        return connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in));
 }
 
@@ -153,16 +489,10 @@ memc_make_tcp_sock (memcached_ctx_t *ctx)
                        return -1;
                }
        }
-       /* Get write readiness */
-       if (poll_d (ctx->sock, 0, 1, ctx->timeout) == 1) {
-               return 0;
-       } 
-       else {
-               memc_log (ctx, __LINE__, "memc_make_tcp_sock: poll() timeout");
-               close (ctx->sock);
-               ctx->sock = -1;
-               return -1;
-       }
+       ctx->op = CMD_CONNECT;
+       event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+       event_add (&ctx->mem_ev, &ctx->timeout);
+       return 0;
 }
 
 /* 
@@ -202,160 +532,21 @@ memc_parse_header (char *buf, size_t *len, char **end)
 
        return -1;
 }
+
+
 /*
  * Common read command handler for memcached
  */
 memc_error_t
 memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem)
 {
-       char read_buf[READ_BUFSIZ];
-       char *p;
-       int i, retries;
-       ssize_t r, sum = 0, written = 0;
-       size_t datalen;
-       struct memc_udp_header header;
-       struct iovec iov[2];
-       
+       int i;
        for (i = 0; i < *nelem; i++) {
-               if (ctx->protocol == UDP_TEXT) {
-                       /* Send udp header */
-                       bzero (&header, sizeof (header));
-                       header.dg_sent = htons (1);
-                       header.req_id = ctx->count;
-               }
-
-               r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, cmd, params[i].key);
-               memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf);
-               if (ctx->protocol == UDP_TEXT) {
-                       iov[0].iov_base = &header;
-                       iov[0].iov_len = sizeof (struct memc_udp_header);
-                       iov[1].iov_base = read_buf;
-                       iov[1].iov_len = r;
-                       writev (ctx->sock, iov, 2);
-               }
-               else {
-                       write (ctx->sock, read_buf, r);
-               }
-
-               /* Read reply from server */
-               retries = 0;
-               while (ctx->protocol == UDP_TEXT) {
-                       if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
-                               return SERVER_TIMEOUT;
-                       }
-                       iov[0].iov_base = &header;
-                       iov[0].iov_len = sizeof (struct memc_udp_header);
-                       iov[1].iov_base = read_buf;
-                       iov[1].iov_len = READ_BUFSIZ;
-                       if ((r = readv (ctx->sock, iov, 2)) == -1) {
-                               return SERVER_ERROR;
-                       }
-                       memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf);
-                       if (header.req_id != ctx->count && retries < MAX_RETRIES) {
-                               memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count);
-                               retries++;
-                               /* Not our reply packet */
-                               continue;
-                       }
-                       break;
-               }
-               if (ctx->protocol != UDP_TEXT) {
-                       if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
-                               memc_log (ctx, __LINE__, "memc_read: timeout waiting reply");
-                               return SERVER_TIMEOUT;
-                       }
-                       r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
-               }
-
-               if (r > 0) {
-                       sum += r;
-                       read_buf[r] = 0;
-                       r = memc_parse_header (read_buf, &datalen, &p);
-                       if (r < 0) {
-                               memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply");
-                               return SERVER_ERROR;
-                       }
-                       else if (r == 0) {
-                               memc_log (ctx, __LINE__, "memc_read: record does not exists");
-                               return NOT_EXISTS;
-                       }
-
-                       if (datalen != params[i].bufsize) {
-                               memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", params[i].bufsize, datalen);
-                               return WRONG_LENGTH;
-                       }
-
-                       /* Subtract from sum parsed header's length */
-                       sum -= p - read_buf;
-                       /* Check if we already have all data in buffer */
-                       if (sum >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) {
-                               /* Store all data in param's buffer */
-                               memcpy (params[i].buf, p, datalen);
-                               /* Increment count */
-                               ctx->count++;
-                               return OK;
-                       }
-                       else {
-                               /* Store this part of data in param's buffer */
-                               memcpy (params[i].buf, p, sum);
-                               written += sum;
-                       }
-               }
-               else {
-                       memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r);
-                       return SERVER_ERROR;
-               }
-               /* Read data from multiply datagrams */
-               p = read_buf;
-
-               while (sum < datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) {
-                       retries = 0;
-                       while (ctx->protocol == UDP_TEXT) {
-                               if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
-                                       memc_log (ctx, __LINE__, "memc_read: timeout waiting reply");
-                                       return SERVER_TIMEOUT;
-                               }
-                               iov[0].iov_base = &header;
-                               iov[0].iov_len = sizeof (struct memc_udp_header);
-                               iov[1].iov_base = read_buf;
-                               iov[1].iov_len = READ_BUFSIZ;
-                               if ((r = readv (ctx->sock, iov, 2)) == -1) {
-                                       memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r);
-                                       return SERVER_ERROR;
-                               }
-                               if (header.req_id != ctx->count && retries < MAX_RETRIES) {
-                                       memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count);
-                                       retries ++;
-                                       /* Not our reply packet */
-                                       continue;
-                               }
-                       }
-                       if (ctx->protocol != UDP_TEXT) {
-                               if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
-                                       memc_log (ctx, __LINE__, "memc_read: timeout waiting reply");
-                                       return SERVER_TIMEOUT;
-                               }
-                               r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
-                       }
-                       
-                       p = read_buf;
-                       sum += r;
-                       if (r <= 0) {
-                               break;
-                       }
-                       /* Copy received buffer to result buffer */
-                       while (r--) {
-                               /* Break on reading END\r\n */
-                               if (strncmp (p, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
-                                       break;
-                               }
-                               if (written < datalen) {
-                                       params[i].buf[written++] = *p++;
-                               }
-                       }
-               }
-               /* Increment count */
-               ctx->count++;
+               ctx->cmd = cmd;
+               ctx->op = CMD_READ;
+               ctx->param = &params[i];
+               event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+               event_add (&ctx->mem_ev, &ctx->timeout);
        }
 
        return OK;
@@ -367,97 +558,13 @@ memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, siz
 memc_error_t
 memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire)
 {
-       char read_buf[READ_BUFSIZ];
-       int i, retries, ofl;
-       ssize_t r;
-       struct memc_udp_header header;
-       struct iovec iov[4];
-       
+       int i;  
        for (i = 0; i < *nelem; i++) {
-               if (ctx->protocol == UDP_TEXT) {
-                       /* Send udp header */
-                       bzero (&header, sizeof (header));
-                       header.dg_sent = htons (1);
-                       header.req_id = ctx->count;
-               }
-
-               r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, cmd, params[i].key, expire, params[i].bufsize);
-               memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf);
-               /* Set socket blocking */
-               ofl = fcntl(ctx->sock, F_GETFL, 0);
-               fcntl(ctx->sock, F_SETFL, ofl & (~O_NONBLOCK));
-
-               if (ctx->protocol == UDP_TEXT) {
-                       iov[0].iov_base = &header;
-                       iov[0].iov_len = sizeof (struct memc_udp_header);
-                       iov[1].iov_base = read_buf;
-                       iov[1].iov_len = r;
-                       iov[2].iov_base = params[i].buf;
-                       iov[2].iov_len = params[i].bufsize;
-                       iov[3].iov_base = CRLF;
-                       iov[3].iov_len = sizeof (CRLF) - 1;
-                       writev (ctx->sock, iov, 4);
-               }
-               else {
-                       iov[0].iov_base = read_buf;
-                       iov[0].iov_len = r;
-                       iov[1].iov_base = params[i].buf;
-                       iov[1].iov_len = params[i].bufsize;
-                       iov[2].iov_base = CRLF;
-                       iov[2].iov_len = sizeof (CRLF) - 1;
-                       writev (ctx->sock, iov, 3);     
-               }
-               
-               /* Restore socket mode */
-               fcntl(ctx->sock, F_SETFL, ofl);
-               /* Read reply from server */
-               if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
-                       memc_log (ctx, __LINE__, "memc_write: server timeout while reading reply");
-                       return SERVER_ERROR;
-               }
-               /* Read header */
-               retries = 0;
-               while (ctx->protocol == UDP_TEXT) {
-                       if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
-                               memc_log (ctx, __LINE__, "memc_write: timeout waiting reply");
-                               return SERVER_TIMEOUT;
-                       }
-                       iov[0].iov_base = &header;
-                       iov[0].iov_len = sizeof (struct memc_udp_header);
-                       iov[1].iov_base = read_buf;
-                       iov[1].iov_len = READ_BUFSIZ;
-                       if ((r = readv (ctx->sock, iov, 2)) == -1) {
-                               return SERVER_ERROR;
-                       }
-                       if (header.req_id != ctx->count && retries < MAX_RETRIES) {
-                               retries ++;
-                               /* Not our reply packet */
-                               continue;
-                       }
-                       break;
-               }
-               if (ctx->protocol != UDP_TEXT) {
-                       if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
-                               memc_log (ctx, __LINE__, "memc_write: timeout waiting reply");
-                               return SERVER_TIMEOUT;
-                       }
-                       r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
-               }
-               /* Increment count */
-               ctx->count++;
-               
-               if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
-                       continue;
-               }
-               else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) {
-                       return CLIENT_ERROR;
-               }
-               else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) {
-                       return EXISTS;
-               }
-               else {
-                       return SERVER_ERROR;
-               }
+               ctx->cmd = cmd;
+               ctx->op = CMD_WRITE;
+               ctx->param = &params[i];
+               event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+               event_add (&ctx->mem_ev, &ctx->timeout);
        }
 
        return OK;
@@ -468,71 +575,14 @@ memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, si
 memc_error_t
 memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem)
 {
-       char read_buf[READ_BUFSIZ];
-       int i, retries;
-       ssize_t r;
-       struct memc_udp_header header;
-       struct iovec iov[2];
+       int i;
        
        for (i = 0; i < *nelem; i++) {
-               if (ctx->protocol == UDP_TEXT) {
-                       /* Send udp header */
-                       bzero (&header, sizeof (header));
-                       header.dg_sent = htons(1);
-                       header.req_id = ctx->count;
-               }
-
-               r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, params[i].key);
-               memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf);
-               if (ctx->protocol == UDP_TEXT) {
-                       iov[0].iov_base = &header;
-                       iov[0].iov_len = sizeof (struct memc_udp_header);
-                       iov[1].iov_base = read_buf;
-                       iov[1].iov_len = r;
-                       writev (ctx->sock, iov, 2);
-               }
-               else {
-                       write (ctx->sock, read_buf, r);
-               }
-
-               /* Read reply from server */
-               retries = 0;
-               while (ctx->protocol == UDP_TEXT) {
-                       if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
-                               return SERVER_TIMEOUT;
-                       }
-                       iov[0].iov_base = &header;
-                       iov[0].iov_len = sizeof (struct memc_udp_header);
-                       iov[1].iov_base = read_buf;
-                       iov[1].iov_len = READ_BUFSIZ;
-                       if ((r = readv (ctx->sock, iov, 2)) == -1) {
-                               return SERVER_ERROR;
-                       }
-                       if (header.req_id != ctx->count && retries < MAX_RETRIES) {
-                               retries ++;
-                               /* Not our reply packet */
-                               continue;
-                       }
-                       break;
-               }
-               if (ctx->protocol != UDP_TEXT) {
-                       if (poll_d (ctx->sock, 1, 0, ctx->timeout) != 1) {
-                               return SERVER_TIMEOUT;
-                       }
-                       r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
-               }
-               
-               /* Increment count */
-               ctx->count++;
-               if (strncmp (read_buf, DELETED_TRAILER, sizeof (DELETED_TRAILER) - 1) == 0) {
-                       continue;
-               }
-               else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) {
-                       return NOT_EXISTS;
-               }
-               else {
-                       return SERVER_ERROR;
-               }
+               ctx->cmd = "delete";
+               ctx->op = CMD_DELETE;
+               ctx->param = &params[i];
+               event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+               event_add (&ctx->mem_ev, &ctx->timeout);
        }
 
        return OK;
@@ -630,6 +680,11 @@ memc_init_ctx (memcached_ctx_t *ctx)
 
        ctx->count = 0;
        ctx->alive = 1;
+       ctx->op = CMD_NULL;
+       /* Set default callback */
+       if (ctx->callback == NULL) {
+               ctx->callback = common_memc_callback;
+       }
 
        switch (ctx->protocol) {
                case UDP_TEXT:
@@ -675,6 +730,7 @@ int
 memc_close_ctx (memcached_ctx_t *ctx)
 {
        if (ctx != NULL && ctx->sock != -1) {
+               event_del (&ctx->mem_ev);
                return close (ctx->sock);
        }
 
index c0ae896e3dc36000bfcdcc5968523b4d76bd3e4b..f3872456dda481108cbe64faf362eb9ad51b1441 100644 (file)
@@ -3,11 +3,15 @@
 
 #include <sys/types.h>
 #include <netinet/in.h>
+#include <sys/time.h>
+#include <time.h>
 
 #define MAXKEYLEN 250
 
 #define MEMC_OPT_DEBUG 0x1
 
+struct event;
+
 typedef enum memc_error {
        OK,
        BAD_COMMAND,
@@ -27,26 +31,51 @@ typedef enum memc_proto {
        TCP_BIN
 } memc_proto_t;
 
+typedef enum memc_op {
+       CMD_NULL,
+       CMD_CONNECT,
+       CMD_READ,
+       CMD_WRITE,
+       CMD_DELETE,
+} memc_opt_t;
+
+typedef struct memcached_param_s {
+       char key[MAXKEYLEN];
+       u_char *buf;
+       size_t bufsize;
+       size_t bufpos;
+       int expire;
+} memcached_param_t;
+
+
 /* Port must be in network byte order */
 typedef struct memcached_ctx_s {
        memc_proto_t protocol;
        struct in_addr addr;
        uint16_t port;
        int sock;
-       int timeout;
+       struct timeval timeout;
        /* Counter that is used for memcached operations in network byte order */
        uint16_t count;
        /* Flag that signalize that this memcached is alive */
        short alive;
        /* Options that can be specified for memcached connection */
        short options;
+       /* Current operation */
+       memc_opt_t op;
+       /* Event structure */
+       struct event mem_ev;
+       /* Current command */
+       const char *cmd;
+       /* Current param */
+       memcached_param_t *param;
+       /* Callback for current operation */
+       void (*callback) (struct memcached_ctx_s *ctx, memc_error_t error, void *data);
+       /* Data for callback function */
+       void *callback_data;
 } memcached_ctx_t;
 
-typedef struct memcached_param_s {
-       char key[MAXKEYLEN];
-       u_char *buf;
-       size_t bufsize;
-} memcached_param_t;
+typedef void (*memcached_callback_t) (memcached_ctx_t *ctx, memc_error_t error, void *data);
 
 /* 
  * Initialize connection to memcached server:
diff --git a/util.c b/util.c
index 37ceacd44d681007eace8f09c6bc268b51af7183..52591983a4c0f7a42c440f32623edd523708002b 100644 (file)
--- a/util.c
+++ b/util.c
@@ -196,7 +196,7 @@ pass_signal_worker (struct workq *workers, int signo)
 
 void convert_to_lowercase (char *str, unsigned int size)
 {
-       while (size --) {
+       while (size--) {
                *str = tolower (*str ++);
        }
 }
@@ -277,7 +277,7 @@ init_title(int argc, char *argv[], char *envp[])
        if (!end_of_buffer)
                return 0;
 
-       char  **new_environ = malloc ((i + 1) * sizeof (envp[0]));
+       char  **new_environ = g_malloc ((i + 1) * sizeof (envp[0]));
 
        if (!new_environ)
                return 0;
@@ -375,7 +375,7 @@ pidfile_open(const char *path, mode_t mode, pid_t *pidptr)
        int error, fd, len, count;
        struct timespec rqtp;
 
-       pfh = malloc(sizeof(*pfh));
+       pfh = g_malloc(sizeof(*pfh));
        if (pfh == NULL)
                return (NULL);