summaryrefslogtreecommitdiffstats
path: root/src/memcached.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2008-11-01 18:01:05 +0300
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2008-11-01 18:01:05 +0300
commit2aa9c74f1c449da92f6faf870f8cc801a83bb08b (patch)
tree33f0f941f08583fd0c4c3653cadde8d6ce8426c2 /src/memcached.c
parentcc5343692b448c27485a24ea7f1b24d714bb82f6 (diff)
downloadrspamd-2aa9c74f1c449da92f6faf870f8cc801a83bb08b.tar.gz
rspamd-2aa9c74f1c449da92f6faf870f8cc801a83bb08b.zip
* Reorganize structure of source files
* Adopt build system for new structure --HG-- rename : cfg_file.h => src/cfg_file.h rename : cfg_file.l => src/cfg_file.l rename : cfg_file.y => src/cfg_file.y rename : cfg_utils.c => src/cfg_utils.c rename : controller.c => src/controller.c rename : filter.c => src/filter.c rename : filter.h => src/filter.h rename : fstring.c => src/fstring.c rename : fstring.h => src/fstring.h rename : main.c => src/main.c rename : main.h => src/main.h rename : mem_pool.c => src/mem_pool.c rename : mem_pool.h => src/mem_pool.h rename : memcached-test.c => src/memcached-test.c rename : memcached.c => src/memcached.c rename : memcached.h => src/memcached.h rename : perl.c => src/perl.c rename : perl.h => src/perl.h rename : plugins/regexp.c => src/plugins/regexp.c rename : plugins/surbl.c => src/plugins/surbl.c rename : protocol.c => src/protocol.c rename : protocol.h => src/protocol.h rename : upstream.c => src/upstream.c rename : upstream.h => src/upstream.h rename : url.c => src/url.c rename : url.h => src/url.h rename : util.c => src/util.c rename : util.h => src/util.h rename : worker.c => src/worker.c
Diffstat (limited to 'src/memcached.c')
-rw-r--r--src/memcached.c792
1 files changed, 792 insertions, 0 deletions
diff --git a/src/memcached.c b/src/memcached.c
new file mode 100644
index 000000000..05ae16617
--- /dev/null
+++ b/src/memcached.c
@@ -0,0 +1,792 @@
+#ifdef _THREAD_SAFE
+#include <pthread.h>
+#endif
+
+#include <stdarg.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/param.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sysexits.h>
+#include <unistd.h>
+#include <syslog.h>
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <sys/poll.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/uio.h>
+#include <event.h>
+#include <glib.h>
+
+#include "memcached.h"
+
+#define CRLF "\r\n"
+#define END_TRAILER "END" CRLF
+#define STORED_TRAILER "STORED" CRLF
+#define NOT_STORED_TRAILER "NOT STORED" CRLF
+#define EXISTS_TRAILER "EXISTS" CRLF
+#define DELETED_TRAILER "DELETED" CRLF
+#define NOT_FOUND_TRAILER "NOT_FOUND" CRLF
+#define CLIENT_ERROR_TRAILER "CLIENT_ERROR"
+#define SERVER_ERROR_TRAILER "SERVER_ERROR"
+
+#define READ_BUFSIZ 1500
+#define MAX_RETRIES 3
+
+/* Header for udp protocol */
+struct memc_udp_header
+{
+ uint16_t req_id;
+ uint16_t seq_num;
+ uint16_t dg_sent;
+ uint16_t unused;
+};
+
+static void socket_callback (int fd, short what, void *arg);
+static int memc_parse_header (char *buf, size_t *len, char **end);
+
+/*
+ * Write to syslog if OPT_DEBUG is specified
+ */
+static void
+memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...)
+{
+ va_list args;
+ if (ctx->options & MEMC_OPT_DEBUG) {
+ va_start (args, fmt);
+ g_log (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port));
+ g_logv (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, fmt, args);
+ va_end (args);
+ }
+}
+
+/*
+ * Callback for write command
+ */
+static void
+write_handler (int fd, short what, memcached_ctx_t *ctx)
+{
+ char read_buf[READ_BUFSIZ];
+ int retries;
+ ssize_t r;
+ struct memc_udp_header header;
+ struct iovec iov[4];
+
+ /* Write something to memcached */
+ if (what == EV_WRITE) {
+ if (ctx->protocol == UDP_TEXT) {
+ /* Send udp header */
+ bzero (&header, sizeof (header));
+ header.dg_sent = htons (1);
+ header.req_id = ctx->count;
+ }
+
+ r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, ctx->cmd, ctx->param->key, ctx->param->expire, ctx->param->bufsize);
+ memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf);
+
+ if (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ if (ctx->param->bufpos == 0) {
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = r;
+ }
+ else {
+ iov[1].iov_base = NULL;
+ iov[1].iov_len = 0;
+ }
+ iov[2].iov_base = ctx->param->buf + ctx->param->bufpos;
+ iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos;
+ iov[3].iov_base = CRLF;
+ iov[3].iov_len = sizeof (CRLF) - 1;
+ writev (ctx->sock, iov, 4);
+ }
+ else {
+ iov[0].iov_base = read_buf;
+ iov[0].iov_len = r;
+ iov[1].iov_base = ctx->param->buf + ctx->param->bufpos;
+ iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos;
+ iov[2].iov_base = CRLF;
+ iov[2].iov_len = sizeof (CRLF) - 1;
+ writev (ctx->sock, iov, 3);
+ }
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ else if (what == EV_READ) {
+ /* Read header */
+ retries = 0;
+ while (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = READ_BUFSIZ;
+ if ((r = readv (ctx->sock, iov, 2)) == -1) {
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ }
+ if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+ retries ++;
+ /* Not our reply packet */
+ continue;
+ }
+ break;
+ }
+ if (ctx->protocol != UDP_TEXT) {
+ r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+ }
+ memc_log (ctx, __LINE__, "memc_write: read reply from memcached: %s", read_buf);
+ /* Increment count */
+ ctx->count++;
+ event_del (&ctx->mem_ev);
+ if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, OK, ctx->callback_data);
+ }
+ else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, CLIENT_ERROR, ctx->callback_data);
+ }
+ else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, EXISTS, ctx->callback_data);
+ }
+ else {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ }
+ }
+ else if (what == EV_TIMEOUT) {
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ }
+}
+
+/*
+ * Callback for read command
+ */
+static void
+read_handler (int fd, short what, memcached_ctx_t *ctx)
+{
+ char read_buf[READ_BUFSIZ];
+ char *p;
+ ssize_t r;
+ size_t datalen;
+ struct memc_udp_header header;
+ struct iovec iov[2];
+ int retries = 0, t;
+
+ if (what == EV_WRITE) {
+ /* Send command to memcached */
+ if (ctx->protocol == UDP_TEXT) {
+ /* Send udp header */
+ bzero (&header, sizeof (header));
+ header.dg_sent = htons (1);
+ header.req_id = ctx->count;
+ }
+
+ r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, ctx->cmd, ctx->param->key);
+ memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf);
+ if (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = r;
+ writev (ctx->sock, iov, 2);
+ }
+ else {
+ write (ctx->sock, read_buf, r);
+ }
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ else if (what == EV_READ) {
+ while (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = READ_BUFSIZ;
+ if ((r = readv (ctx->sock, iov, 2)) == -1) {
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ return;
+ }
+ memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf);
+ if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+ memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count);
+ retries++;
+ /* Not our reply packet */
+ continue;
+ }
+ break;
+ }
+ if (ctx->protocol != UDP_TEXT) {
+ r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+ }
+
+ if (r > 0) {
+ read_buf[r] = 0;
+ if (ctx->param->bufpos == 0) {
+ t = memc_parse_header (read_buf, &datalen, &p);
+ if (t < 0) {
+ event_del (&ctx->mem_ev);
+ memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply");
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ return;
+ }
+ else if (t == 0) {
+ memc_log (ctx, __LINE__, "memc_read: record does not exists");
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
+ return;
+ }
+
+ if (datalen > ctx->param->bufsize) {
+ memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen);
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data);
+ return;
+ }
+ /* Check if we already have all data in buffer */
+ if (r >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) {
+ /* Store all data in param's buffer */
+ memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen);
+ /* Increment count */
+ ctx->count++;
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, OK, ctx->callback_data);
+ return;
+ }
+ /* Subtract from sum parsed header's length */
+ r -= p - read_buf;
+ }
+ else {
+ p = read_buf;
+ }
+
+ if (strncmp (ctx->param->buf + ctx->param->bufpos + r - sizeof (END_TRAILER) - sizeof (CRLF) + 2,
+ END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
+ r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2;
+ memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, OK, ctx->callback_data);
+ return;
+ }
+ /* Store this part of data in param's buffer */
+ memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
+ ctx->param->bufpos += r;
+ }
+ else {
+ memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r);
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ return;
+ }
+
+ ctx->count++;
+ }
+ else if (what == EV_TIMEOUT) {
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ }
+
+}
+
+/*
+ * Callback for delete command
+ */
+static void
+delete_handler (int fd, short what, memcached_ctx_t *ctx)
+{
+ char read_buf[READ_BUFSIZ];
+ int retries;
+ ssize_t r;
+ struct memc_udp_header header;
+ struct iovec iov[2];
+
+ /* Write something to memcached */
+ if (what == EV_WRITE) {
+ if (ctx->protocol == UDP_TEXT) {
+ /* Send udp header */
+ bzero (&header, sizeof (header));
+ header.dg_sent = htons (1);
+ header.req_id = ctx->count;
+ }
+ r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, ctx->param->key);
+ memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf);
+
+ if (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = r;
+ ctx->param->bufpos = writev (ctx->sock, iov, 2);
+ }
+ else {
+ write (ctx->sock, read_buf, r);
+ }
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ }
+ else if (what == EV_READ) {
+ /* Read header */
+ retries = 0;
+ while (ctx->protocol == UDP_TEXT) {
+ iov[0].iov_base = &header;
+ iov[0].iov_len = sizeof (struct memc_udp_header);
+ iov[1].iov_base = read_buf;
+ iov[1].iov_len = READ_BUFSIZ;
+ if ((r = readv (ctx->sock, iov, 2)) == -1) {
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ return;
+ }
+ if (header.req_id != ctx->count && retries < MAX_RETRIES) {
+ retries ++;
+ /* Not our reply packet */
+ continue;
+ }
+ break;
+ }
+ if (ctx->protocol != UDP_TEXT) {
+ r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
+ }
+ /* Increment count */
+ ctx->count++;
+ event_del (&ctx->mem_ev);
+ if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, OK, ctx->callback_data);
+ }
+ else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) {
+ ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
+ }
+ else {
+ ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
+ }
+ }
+ else if (what == EV_TIMEOUT) {
+ event_del (&ctx->mem_ev);
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ }
+}
+
+/*
+ * Callback for our socket events
+ */
+static void
+socket_callback (int fd, short what, void *arg)
+{
+ memcached_ctx_t *ctx = (memcached_ctx_t *)arg;
+
+ switch (ctx->op) {
+ case CMD_NULL:
+ /* Do nothing here */
+ break;
+ case CMD_CONNECT:
+ /* We have write readiness after connect call, so reinit event */
+ ctx->cmd = "connect";
+ if (what == EV_WRITE) {
+ event_del (&ctx->mem_ev);
+ event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, NULL);
+ ctx->callback (ctx, OK, ctx->callback_data);
+ ctx->alive = 1;
+ }
+ else {
+ ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ ctx->alive = 0;
+ }
+ break;
+ case CMD_WRITE:
+ write_handler (fd, what, ctx);
+ break;
+ case CMD_READ:
+ read_handler (fd, what, ctx);
+ break;
+ case CMD_DELETE:
+ delete_handler (fd, what, ctx);
+ break;
+ }
+}
+
+/*
+ * Common callback function for memcached operations if no user's callback is specified
+ */
+static void
+common_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+ memc_log (ctx, __LINE__, "common_memc_callback: result of memc command '%s' is '%s'", ctx->cmd, memc_strerror (error));
+}
+
+/*
+ * Make socket for udp connection
+ */
+static int
+memc_make_udp_sock (memcached_ctx_t *ctx)
+{
+ struct sockaddr_in sc;
+ int ofl;
+
+ bzero (&sc, sizeof (struct sockaddr_in *));
+ sc.sin_family = AF_INET;
+ sc.sin_port = ctx->port;
+ memcpy (&sc.sin_addr, &ctx->addr, sizeof (struct in_addr));
+
+ ctx->sock = socket (PF_INET, SOCK_DGRAM, 0);
+
+ if (ctx->sock == -1) {
+ memc_log (ctx, __LINE__, "memc_make_udp_sock: socket() failed: %m");
+ return -1;
+ }
+
+ /* set nonblocking */
+ ofl = fcntl(ctx->sock, F_GETFL, 0);
+ fcntl(ctx->sock, F_SETFL, ofl | O_NONBLOCK);
+
+ /*
+ * Call connect to set default destination for datagrams
+ * May not block
+ */
+ ctx->op = CMD_CONNECT;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, NULL);
+ return connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in));
+}
+
+/*
+ * Make socket for tcp connection
+ */
+static int
+memc_make_tcp_sock (memcached_ctx_t *ctx)
+{
+ struct sockaddr_in sc;
+ int ofl, r;
+
+ bzero (&sc, sizeof (struct sockaddr_in *));
+ sc.sin_family = AF_INET;
+ sc.sin_port = ctx->port;
+ memcpy (&sc.sin_addr, &ctx->addr, sizeof (struct in_addr));
+
+ ctx->sock = socket (PF_INET, SOCK_STREAM, 0);
+
+ if (ctx->sock == -1) {
+ memc_log (ctx, __LINE__, "memc_make_tcp_sock: socket() failed: %m");
+ return -1;
+ }
+
+ /* set nonblocking */
+ ofl = fcntl(ctx->sock, F_GETFL, 0);
+ fcntl(ctx->sock, F_SETFL, ofl | O_NONBLOCK);
+
+ if ((r = connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in))) == -1) {
+ if (errno != EINPROGRESS) {
+ close (ctx->sock);
+ ctx->sock = -1;
+ memc_log (ctx, __LINE__, "memc_make_tcp_sock: connect() failed: %m");
+ return -1;
+ }
+ }
+ ctx->op = CMD_CONNECT;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+ return 0;
+}
+
+/*
+ * Parse VALUE reply from server and set len argument to value returned by memcached
+ */
+static int
+memc_parse_header (char *buf, size_t *len, char **end)
+{
+ char *p, *c;
+ int i;
+
+ /* VALUE <key> <flags> <bytes> [<cas unique>]\r\n */
+ c = strstr (buf, CRLF);
+ if (c == NULL) {
+ return -1;
+ }
+ *end = c + sizeof (CRLF) - 1;
+
+ if (strncmp (buf, "VALUE ", sizeof ("VALUE ") - 1) == 0) {
+ p = buf + sizeof ("VALUE ") - 1;
+
+ /* Read bytes value and ignore all other fields, such as flags and key */
+ for (i = 0; i < 2; i++) {
+ while (p++ < c && *p != ' ');
+
+ if (p > c) {
+ return -1;
+ }
+ }
+ *len = strtoul (p, &c, 10);
+ return 1;
+ }
+ /* If value not found memcached return just END\r\n , in this case return 0 */
+ else if (strncmp (buf, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
+ return 0;
+ }
+
+ return -1;
+}
+
+
+/*
+ * Common read command handler for memcached
+ */
+memc_error_t
+memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param)
+{
+ ctx->cmd = cmd;
+ ctx->op = CMD_READ;
+ ctx->param = param;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+
+ return OK;
+}
+
+/*
+ * Common write command handler for memcached
+ */
+memc_error_t
+memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire)
+{
+ ctx->cmd = cmd;
+ ctx->op = CMD_WRITE;
+ ctx->param = param;
+ param->expire = expire;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+
+ return OK;
+}
+/*
+ * Delete command handler
+ */
+memc_error_t
+memc_delete (memcached_ctx_t *ctx, memcached_param_t *param)
+{
+ ctx->cmd = "delete";
+ ctx->op = CMD_DELETE;
+ ctx->param = param;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
+
+ return OK;
+}
+
+/*
+ * Write handler for memcached mirroring
+ * writing is done to each memcached server
+ */
+memc_error_t
+memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire)
+{
+ memc_error_t r, result = OK;
+
+ while (memcached_num --) {
+ if (ctx[memcached_num].alive == 1) {
+ r = memc_write (&ctx[memcached_num], cmd, param, expire);
+ if (r != OK) {
+ memc_log (&ctx[memcached_num], __LINE__, "memc_write_mirror: cannot write to mirror server: %s", memc_strerror (r));
+ result = r;
+ ctx[memcached_num].alive = 0;
+ }
+ }
+ }
+
+ return result;
+}
+
+/*
+ * Read handler for memcached mirroring
+ * reading is done from first active memcached server
+ */
+memc_error_t
+memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param)
+{
+ memc_error_t r, result = OK;
+
+ while (memcached_num --) {
+ if (ctx[memcached_num].alive == 1) {
+ r = memc_read (&ctx[memcached_num], cmd, param);
+ if (r != OK) {
+ result = r;
+ if (r != NOT_EXISTS) {
+ ctx[memcached_num].alive = 0;
+ memc_log (&ctx[memcached_num], __LINE__, "memc_read_mirror: cannot write read from mirror server: %s", memc_strerror (r));
+ }
+ else {
+ memc_log (&ctx[memcached_num], __LINE__, "memc_read_mirror: record not exists", memc_strerror (r));
+ }
+ }
+ else {
+ break;
+ }
+ }
+ }
+
+ return result;
+}
+
+/*
+ * Delete handler for memcached mirroring
+ * deleting is done for each active memcached server
+ */
+memc_error_t
+memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param)
+{
+ memc_error_t r, result = OK;
+
+ while (memcached_num --) {
+ if (ctx[memcached_num].alive == 1) {
+ r = memc_delete (&ctx[memcached_num], param);
+ if (r != OK) {
+ result = r;
+ if (r != NOT_EXISTS) {
+ ctx[memcached_num].alive = 0;
+ memc_log (&ctx[memcached_num], __LINE__, "memc_delete_mirror: cannot delete from mirror server: %s", memc_strerror (r));
+ }
+ }
+ }
+ }
+
+ return result;
+}
+
+
+/*
+ * Initialize memcached context for specified protocol
+ */
+int
+memc_init_ctx (memcached_ctx_t *ctx)
+{
+ if (ctx == NULL) {
+ return -1;
+ }
+
+ ctx->count = 0;
+ ctx->alive = 0;
+ ctx->op = CMD_NULL;
+ /* Set default callback */
+ if (ctx->callback == NULL) {
+ ctx->callback = common_memc_callback;
+ }
+
+ switch (ctx->protocol) {
+ case UDP_TEXT:
+ return memc_make_udp_sock (ctx);
+ break;
+ case TCP_TEXT:
+ return memc_make_tcp_sock (ctx);
+ break;
+ /* Not implemented */
+ case UDP_BIN:
+ case TCP_BIN:
+ default:
+ return -1;
+ }
+}
+/*
+ * Mirror init
+ */
+int
+memc_init_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num)
+{
+ int r, result = -1;
+ while (memcached_num--) {
+ if (ctx[memcached_num].alive == 1) {
+ r = memc_init_ctx (&ctx[memcached_num]);
+ if (r == -1) {
+ ctx[memcached_num].alive = 0;
+ memc_log (&ctx[memcached_num], __LINE__, "memc_init_ctx_mirror: cannot connect to server");
+ }
+ else {
+ result = 1;
+ }
+ }
+ }
+
+ return result;
+}
+
+/*
+ * Close context connection
+ */
+int
+memc_close_ctx (memcached_ctx_t *ctx)
+{
+ if (ctx != NULL && ctx->sock != -1) {
+ event_del (&ctx->mem_ev);
+ return close (ctx->sock);
+ }
+
+ return -1;
+}
+/*
+ * Mirror close
+ */
+int
+memc_close_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num)
+{
+ int r = 0;
+ while (memcached_num--) {
+ if (ctx[memcached_num].alive == 1) {
+ r = memc_close_ctx (&ctx[memcached_num]);
+ if (r == -1) {
+ memc_log (&ctx[memcached_num], __LINE__, "memc_close_ctx_mirror: cannot close connection to server properly");
+ ctx[memcached_num].alive = 0;
+ }
+ }
+ }
+
+ return r;
+}
+
+
+const char * memc_strerror (memc_error_t err)
+{
+ const char *p;
+
+ switch (err) {
+ case OK:
+ p = "Ok";
+ break;
+ case BAD_COMMAND:
+ p = "Bad command";
+ break;
+ case CLIENT_ERROR:
+ p = "Client error";
+ break;
+ case SERVER_ERROR:
+ p = "Server error";
+ break;
+ case SERVER_TIMEOUT:
+ p = "Server timeout";
+ break;
+ case NOT_EXISTS:
+ p = "Key not found";
+ break;
+ case EXISTS:
+ p = "Key already exists";
+ break;
+ case WRONG_LENGTH:
+ p = "Wrong result length";
+ break;
+ default:
+ p = "Unknown error";
+ break;
+ }
+
+ return p;
+}
+
+/*
+ * vi:ts=4
+ */