diff options
author | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2008-11-01 18:01:05 +0300 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@rambler-co.ru> | 2008-11-01 18:01:05 +0300 |
commit | 2aa9c74f1c449da92f6faf870f8cc801a83bb08b (patch) | |
tree | 33f0f941f08583fd0c4c3653cadde8d6ce8426c2 /src/memcached.c | |
parent | cc5343692b448c27485a24ea7f1b24d714bb82f6 (diff) | |
download | rspamd-2aa9c74f1c449da92f6faf870f8cc801a83bb08b.tar.gz rspamd-2aa9c74f1c449da92f6faf870f8cc801a83bb08b.zip |
* Reorganize structure of source files
* Adopt build system for new structure
--HG--
rename : cfg_file.h => src/cfg_file.h
rename : cfg_file.l => src/cfg_file.l
rename : cfg_file.y => src/cfg_file.y
rename : cfg_utils.c => src/cfg_utils.c
rename : controller.c => src/controller.c
rename : filter.c => src/filter.c
rename : filter.h => src/filter.h
rename : fstring.c => src/fstring.c
rename : fstring.h => src/fstring.h
rename : main.c => src/main.c
rename : main.h => src/main.h
rename : mem_pool.c => src/mem_pool.c
rename : mem_pool.h => src/mem_pool.h
rename : memcached-test.c => src/memcached-test.c
rename : memcached.c => src/memcached.c
rename : memcached.h => src/memcached.h
rename : perl.c => src/perl.c
rename : perl.h => src/perl.h
rename : plugins/regexp.c => src/plugins/regexp.c
rename : plugins/surbl.c => src/plugins/surbl.c
rename : protocol.c => src/protocol.c
rename : protocol.h => src/protocol.h
rename : upstream.c => src/upstream.c
rename : upstream.h => src/upstream.h
rename : url.c => src/url.c
rename : url.h => src/url.h
rename : util.c => src/util.c
rename : util.h => src/util.h
rename : worker.c => src/worker.c
Diffstat (limited to 'src/memcached.c')
-rw-r--r-- | src/memcached.c | 792 |
1 files changed, 792 insertions, 0 deletions
diff --git a/src/memcached.c b/src/memcached.c new file mode 100644 index 000000000..05ae16617 --- /dev/null +++ b/src/memcached.c @@ -0,0 +1,792 @@ +#ifdef _THREAD_SAFE +#include <pthread.h> +#endif + +#include <stdarg.h> + +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/param.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sysexits.h> +#include <unistd.h> +#include <syslog.h> + +#include <netinet/in.h> +#include <arpa/inet.h> +#include <sys/socket.h> +#include <sys/poll.h> +#include <errno.h> +#include <fcntl.h> +#include <sys/uio.h> +#include <event.h> +#include <glib.h> + +#include "memcached.h" + +#define CRLF "\r\n" +#define END_TRAILER "END" CRLF +#define STORED_TRAILER "STORED" CRLF +#define NOT_STORED_TRAILER "NOT STORED" CRLF +#define EXISTS_TRAILER "EXISTS" CRLF +#define DELETED_TRAILER "DELETED" CRLF +#define NOT_FOUND_TRAILER "NOT_FOUND" CRLF +#define CLIENT_ERROR_TRAILER "CLIENT_ERROR" +#define SERVER_ERROR_TRAILER "SERVER_ERROR" + +#define READ_BUFSIZ 1500 +#define MAX_RETRIES 3 + +/* Header for udp protocol */ +struct memc_udp_header +{ + uint16_t req_id; + uint16_t seq_num; + uint16_t dg_sent; + uint16_t unused; +}; + +static void socket_callback (int fd, short what, void *arg); +static int memc_parse_header (char *buf, size_t *len, char **end); + +/* + * Write to syslog if OPT_DEBUG is specified + */ +static void +memc_log (const memcached_ctx_t *ctx, int line, const char *fmt, ...) +{ + va_list args; + if (ctx->options & MEMC_OPT_DEBUG) { + va_start (args, fmt); + g_log (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port)); + g_logv (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, fmt, args); + va_end (args); + } +} + +/* + * Callback for write command + */ +static void +write_handler (int fd, short what, memcached_ctx_t *ctx) +{ + char read_buf[READ_BUFSIZ]; + int retries; + ssize_t r; + struct memc_udp_header header; + struct iovec iov[4]; + + /* Write something to memcached */ + if (what == EV_WRITE) { + if (ctx->protocol == UDP_TEXT) { + /* Send udp header */ + bzero (&header, sizeof (header)); + header.dg_sent = htons (1); + header.req_id = ctx->count; + } + + r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, ctx->cmd, ctx->param->key, ctx->param->expire, ctx->param->bufsize); + memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf); + + if (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + if (ctx->param->bufpos == 0) { + iov[1].iov_base = read_buf; + iov[1].iov_len = r; + } + else { + iov[1].iov_base = NULL; + iov[1].iov_len = 0; + } + iov[2].iov_base = ctx->param->buf + ctx->param->bufpos; + iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos; + iov[3].iov_base = CRLF; + iov[3].iov_len = sizeof (CRLF) - 1; + writev (ctx->sock, iov, 4); + } + else { + iov[0].iov_base = read_buf; + iov[0].iov_len = r; + iov[1].iov_base = ctx->param->buf + ctx->param->bufpos; + iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos; + iov[2].iov_base = CRLF; + iov[2].iov_len = sizeof (CRLF) - 1; + writev (ctx->sock, iov, 3); + } + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + else if (what == EV_READ) { + /* Read header */ + retries = 0; + while (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = READ_BUFSIZ; + if ((r = readv (ctx->sock, iov, 2)) == -1) { + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + } + if (header.req_id != ctx->count && retries < MAX_RETRIES) { + retries ++; + /* Not our reply packet */ + continue; + } + break; + } + if (ctx->protocol != UDP_TEXT) { + r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); + } + memc_log (ctx, __LINE__, "memc_write: read reply from memcached: %s", read_buf); + /* Increment count */ + ctx->count++; + event_del (&ctx->mem_ev); + if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { + ctx->callback (ctx, OK, ctx->callback_data); + } + else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) { + ctx->callback (ctx, CLIENT_ERROR, ctx->callback_data); + } + else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) { + ctx->callback (ctx, EXISTS, ctx->callback_data); + } + else { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + } + } + else if (what == EV_TIMEOUT) { + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + } +} + +/* + * Callback for read command + */ +static void +read_handler (int fd, short what, memcached_ctx_t *ctx) +{ + char read_buf[READ_BUFSIZ]; + char *p; + ssize_t r; + size_t datalen; + struct memc_udp_header header; + struct iovec iov[2]; + int retries = 0, t; + + if (what == EV_WRITE) { + /* Send command to memcached */ + if (ctx->protocol == UDP_TEXT) { + /* Send udp header */ + bzero (&header, sizeof (header)); + header.dg_sent = htons (1); + header.req_id = ctx->count; + } + + r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, ctx->cmd, ctx->param->key); + memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf); + if (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = r; + writev (ctx->sock, iov, 2); + } + else { + write (ctx->sock, read_buf, r); + } + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + else if (what == EV_READ) { + while (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = READ_BUFSIZ; + if ((r = readv (ctx->sock, iov, 2)) == -1) { + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + return; + } + memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf); + if (header.req_id != ctx->count && retries < MAX_RETRIES) { + memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count); + retries++; + /* Not our reply packet */ + continue; + } + break; + } + if (ctx->protocol != UDP_TEXT) { + r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); + } + + if (r > 0) { + read_buf[r] = 0; + if (ctx->param->bufpos == 0) { + t = memc_parse_header (read_buf, &datalen, &p); + if (t < 0) { + event_del (&ctx->mem_ev); + memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply"); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + return; + } + else if (t == 0) { + memc_log (ctx, __LINE__, "memc_read: record does not exists"); + event_del (&ctx->mem_ev); + ctx->callback (ctx, NOT_EXISTS, ctx->callback_data); + return; + } + + if (datalen > ctx->param->bufsize) { + memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen); + event_del (&ctx->mem_ev); + ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data); + return; + } + /* Check if we already have all data in buffer */ + if (r >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) { + /* Store all data in param's buffer */ + memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen); + /* Increment count */ + ctx->count++; + event_del (&ctx->mem_ev); + ctx->callback (ctx, OK, ctx->callback_data); + return; + } + /* Subtract from sum parsed header's length */ + r -= p - read_buf; + } + else { + p = read_buf; + } + + if (strncmp (ctx->param->buf + ctx->param->bufpos + r - sizeof (END_TRAILER) - sizeof (CRLF) + 2, + END_TRAILER, sizeof (END_TRAILER) - 1) == 0) { + r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2; + memcpy (ctx->param->buf + ctx->param->bufpos, p, r); + event_del (&ctx->mem_ev); + ctx->callback (ctx, OK, ctx->callback_data); + return; + } + /* Store this part of data in param's buffer */ + memcpy (ctx->param->buf + ctx->param->bufpos, p, r); + ctx->param->bufpos += r; + } + else { + memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r); + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + return; + } + + ctx->count++; + } + else if (what == EV_TIMEOUT) { + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + } + +} + +/* + * Callback for delete command + */ +static void +delete_handler (int fd, short what, memcached_ctx_t *ctx) +{ + char read_buf[READ_BUFSIZ]; + int retries; + ssize_t r; + struct memc_udp_header header; + struct iovec iov[2]; + + /* Write something to memcached */ + if (what == EV_WRITE) { + if (ctx->protocol == UDP_TEXT) { + /* Send udp header */ + bzero (&header, sizeof (header)); + header.dg_sent = htons (1); + header.req_id = ctx->count; + } + r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, ctx->param->key); + memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf); + + if (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = r; + ctx->param->bufpos = writev (ctx->sock, iov, 2); + } + else { + write (ctx->sock, read_buf, r); + } + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + } + else if (what == EV_READ) { + /* Read header */ + retries = 0; + while (ctx->protocol == UDP_TEXT) { + iov[0].iov_base = &header; + iov[0].iov_len = sizeof (struct memc_udp_header); + iov[1].iov_base = read_buf; + iov[1].iov_len = READ_BUFSIZ; + if ((r = readv (ctx->sock, iov, 2)) == -1) { + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + return; + } + if (header.req_id != ctx->count && retries < MAX_RETRIES) { + retries ++; + /* Not our reply packet */ + continue; + } + break; + } + if (ctx->protocol != UDP_TEXT) { + r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); + } + /* Increment count */ + ctx->count++; + event_del (&ctx->mem_ev); + if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { + ctx->callback (ctx, OK, ctx->callback_data); + } + else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) { + ctx->callback (ctx, NOT_EXISTS, ctx->callback_data); + } + else { + ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); + } + } + else if (what == EV_TIMEOUT) { + event_del (&ctx->mem_ev); + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + } +} + +/* + * Callback for our socket events + */ +static void +socket_callback (int fd, short what, void *arg) +{ + memcached_ctx_t *ctx = (memcached_ctx_t *)arg; + + switch (ctx->op) { + case CMD_NULL: + /* Do nothing here */ + break; + case CMD_CONNECT: + /* We have write readiness after connect call, so reinit event */ + ctx->cmd = "connect"; + if (what == EV_WRITE) { + event_del (&ctx->mem_ev); + event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, NULL); + ctx->callback (ctx, OK, ctx->callback_data); + ctx->alive = 1; + } + else { + ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + ctx->alive = 0; + } + break; + case CMD_WRITE: + write_handler (fd, what, ctx); + break; + case CMD_READ: + read_handler (fd, what, ctx); + break; + case CMD_DELETE: + delete_handler (fd, what, ctx); + break; + } +} + +/* + * Common callback function for memcached operations if no user's callback is specified + */ +static void +common_memc_callback (memcached_ctx_t *ctx, memc_error_t error, void *data) +{ + memc_log (ctx, __LINE__, "common_memc_callback: result of memc command '%s' is '%s'", ctx->cmd, memc_strerror (error)); +} + +/* + * Make socket for udp connection + */ +static int +memc_make_udp_sock (memcached_ctx_t *ctx) +{ + struct sockaddr_in sc; + int ofl; + + bzero (&sc, sizeof (struct sockaddr_in *)); + sc.sin_family = AF_INET; + sc.sin_port = ctx->port; + memcpy (&sc.sin_addr, &ctx->addr, sizeof (struct in_addr)); + + ctx->sock = socket (PF_INET, SOCK_DGRAM, 0); + + if (ctx->sock == -1) { + memc_log (ctx, __LINE__, "memc_make_udp_sock: socket() failed: %m"); + return -1; + } + + /* set nonblocking */ + ofl = fcntl(ctx->sock, F_GETFL, 0); + fcntl(ctx->sock, F_SETFL, ofl | O_NONBLOCK); + + /* + * Call connect to set default destination for datagrams + * May not block + */ + ctx->op = CMD_CONNECT; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, NULL); + return connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in)); +} + +/* + * Make socket for tcp connection + */ +static int +memc_make_tcp_sock (memcached_ctx_t *ctx) +{ + struct sockaddr_in sc; + int ofl, r; + + bzero (&sc, sizeof (struct sockaddr_in *)); + sc.sin_family = AF_INET; + sc.sin_port = ctx->port; + memcpy (&sc.sin_addr, &ctx->addr, sizeof (struct in_addr)); + + ctx->sock = socket (PF_INET, SOCK_STREAM, 0); + + if (ctx->sock == -1) { + memc_log (ctx, __LINE__, "memc_make_tcp_sock: socket() failed: %m"); + return -1; + } + + /* set nonblocking */ + ofl = fcntl(ctx->sock, F_GETFL, 0); + fcntl(ctx->sock, F_SETFL, ofl | O_NONBLOCK); + + if ((r = connect (ctx->sock, (struct sockaddr*)&sc, sizeof (struct sockaddr_in))) == -1) { + if (errno != EINPROGRESS) { + close (ctx->sock); + ctx->sock = -1; + memc_log (ctx, __LINE__, "memc_make_tcp_sock: connect() failed: %m"); + return -1; + } + } + ctx->op = CMD_CONNECT; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + return 0; +} + +/* + * Parse VALUE reply from server and set len argument to value returned by memcached + */ +static int +memc_parse_header (char *buf, size_t *len, char **end) +{ + char *p, *c; + int i; + + /* VALUE <key> <flags> <bytes> [<cas unique>]\r\n */ + c = strstr (buf, CRLF); + if (c == NULL) { + return -1; + } + *end = c + sizeof (CRLF) - 1; + + if (strncmp (buf, "VALUE ", sizeof ("VALUE ") - 1) == 0) { + p = buf + sizeof ("VALUE ") - 1; + + /* Read bytes value and ignore all other fields, such as flags and key */ + for (i = 0; i < 2; i++) { + while (p++ < c && *p != ' '); + + if (p > c) { + return -1; + } + } + *len = strtoul (p, &c, 10); + return 1; + } + /* If value not found memcached return just END\r\n , in this case return 0 */ + else if (strncmp (buf, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) { + return 0; + } + + return -1; +} + + +/* + * Common read command handler for memcached + */ +memc_error_t +memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param) +{ + ctx->cmd = cmd; + ctx->op = CMD_READ; + ctx->param = param; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + + return OK; +} + +/* + * Common write command handler for memcached + */ +memc_error_t +memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire) +{ + ctx->cmd = cmd; + ctx->op = CMD_WRITE; + ctx->param = param; + param->expire = expire; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + + return OK; +} +/* + * Delete command handler + */ +memc_error_t +memc_delete (memcached_ctx_t *ctx, memcached_param_t *param) +{ + ctx->cmd = "delete"; + ctx->op = CMD_DELETE; + ctx->param = param; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); + + return OK; +} + +/* + * Write handler for memcached mirroring + * writing is done to each memcached server + */ +memc_error_t +memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire) +{ + memc_error_t r, result = OK; + + while (memcached_num --) { + if (ctx[memcached_num].alive == 1) { + r = memc_write (&ctx[memcached_num], cmd, param, expire); + if (r != OK) { + memc_log (&ctx[memcached_num], __LINE__, "memc_write_mirror: cannot write to mirror server: %s", memc_strerror (r)); + result = r; + ctx[memcached_num].alive = 0; + } + } + } + + return result; +} + +/* + * Read handler for memcached mirroring + * reading is done from first active memcached server + */ +memc_error_t +memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param) +{ + memc_error_t r, result = OK; + + while (memcached_num --) { + if (ctx[memcached_num].alive == 1) { + r = memc_read (&ctx[memcached_num], cmd, param); + if (r != OK) { + result = r; + if (r != NOT_EXISTS) { + ctx[memcached_num].alive = 0; + memc_log (&ctx[memcached_num], __LINE__, "memc_read_mirror: cannot write read from mirror server: %s", memc_strerror (r)); + } + else { + memc_log (&ctx[memcached_num], __LINE__, "memc_read_mirror: record not exists", memc_strerror (r)); + } + } + else { + break; + } + } + } + + return result; +} + +/* + * Delete handler for memcached mirroring + * deleting is done for each active memcached server + */ +memc_error_t +memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param) +{ + memc_error_t r, result = OK; + + while (memcached_num --) { + if (ctx[memcached_num].alive == 1) { + r = memc_delete (&ctx[memcached_num], param); + if (r != OK) { + result = r; + if (r != NOT_EXISTS) { + ctx[memcached_num].alive = 0; + memc_log (&ctx[memcached_num], __LINE__, "memc_delete_mirror: cannot delete from mirror server: %s", memc_strerror (r)); + } + } + } + } + + return result; +} + + +/* + * Initialize memcached context for specified protocol + */ +int +memc_init_ctx (memcached_ctx_t *ctx) +{ + if (ctx == NULL) { + return -1; + } + + ctx->count = 0; + ctx->alive = 0; + ctx->op = CMD_NULL; + /* Set default callback */ + if (ctx->callback == NULL) { + ctx->callback = common_memc_callback; + } + + switch (ctx->protocol) { + case UDP_TEXT: + return memc_make_udp_sock (ctx); + break; + case TCP_TEXT: + return memc_make_tcp_sock (ctx); + break; + /* Not implemented */ + case UDP_BIN: + case TCP_BIN: + default: + return -1; + } +} +/* + * Mirror init + */ +int +memc_init_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num) +{ + int r, result = -1; + while (memcached_num--) { + if (ctx[memcached_num].alive == 1) { + r = memc_init_ctx (&ctx[memcached_num]); + if (r == -1) { + ctx[memcached_num].alive = 0; + memc_log (&ctx[memcached_num], __LINE__, "memc_init_ctx_mirror: cannot connect to server"); + } + else { + result = 1; + } + } + } + + return result; +} + +/* + * Close context connection + */ +int +memc_close_ctx (memcached_ctx_t *ctx) +{ + if (ctx != NULL && ctx->sock != -1) { + event_del (&ctx->mem_ev); + return close (ctx->sock); + } + + return -1; +} +/* + * Mirror close + */ +int +memc_close_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num) +{ + int r = 0; + while (memcached_num--) { + if (ctx[memcached_num].alive == 1) { + r = memc_close_ctx (&ctx[memcached_num]); + if (r == -1) { + memc_log (&ctx[memcached_num], __LINE__, "memc_close_ctx_mirror: cannot close connection to server properly"); + ctx[memcached_num].alive = 0; + } + } + } + + return r; +} + + +const char * memc_strerror (memc_error_t err) +{ + const char *p; + + switch (err) { + case OK: + p = "Ok"; + break; + case BAD_COMMAND: + p = "Bad command"; + break; + case CLIENT_ERROR: + p = "Client error"; + break; + case SERVER_ERROR: + p = "Server error"; + break; + case SERVER_TIMEOUT: + p = "Server timeout"; + break; + case NOT_EXISTS: + p = "Key not found"; + break; + case EXISTS: + p = "Key already exists"; + break; + case WRONG_LENGTH: + p = "Wrong result length"; + break; + default: + p = "Unknown error"; + break; + } + + return p; +} + +/* + * vi:ts=4 + */ |