diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-04-30 13:32:36 +0100 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-04-30 13:32:36 +0100 |
commit | 40c6406e4a8bf633fee611a8c0d5ff980f7d3836 (patch) | |
tree | 1e3a005b4c182781d8c1221e5d7c75e4fb398929 | |
parent | d6643f35d783784911ad2e2ca754bcfed29eb11d (diff) | |
download | rspamd-40c6406e4a8bf633fee611a8c0d5ff980f7d3836.tar.gz rspamd-40c6406e4a8bf633fee611a8c0d5ff980f7d3836.zip |
Remove memcached support.
-rw-r--r-- | config.h.in | 10 | ||||
-rw-r--r-- | src/libserver/cfg_file.h | 24 | ||||
-rw-r--r-- | src/libserver/cfg_utils.c | 6 | ||||
-rw-r--r-- | src/libutil/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/libutil/memcached.c | 831 | ||||
-rw-r--r-- | src/libutil/memcached.h | 142 | ||||
-rw-r--r-- | src/main.h | 1 | ||||
-rw-r--r-- | src/memcached-test.c | 103 | ||||
-rw-r--r-- | src/plugins/surbl.c | 107 | ||||
-rw-r--r-- | src/plugins/surbl.h | 9 |
10 files changed, 6 insertions, 1228 deletions
diff --git a/config.h.in b/config.h.in index d44f61636..b7d4ed74c 100644 --- a/config.h.in +++ b/config.h.in @@ -461,19 +461,19 @@ typedef off_t goffset; /* Forwarded declaration */ struct module_ctx; -struct config_file; +struct rspamd_config; struct rspamd_worker; typedef struct module_s { const gchar *name; - int (*module_init_func)(struct config_file *cfg, struct module_ctx **ctx); - int (*module_config_func)(struct config_file *cfg); - int (*module_reconfig_func)(struct config_file *cfg); + int (*module_init_func)(struct rspamd_config *cfg, struct module_ctx **ctx); + int (*module_config_func)(struct rspamd_config *cfg); + int (*module_reconfig_func)(struct rspamd_config *cfg); } module_t; typedef struct worker_s { const gchar *name; - gpointer (*worker_init_func)(struct config_file *cfg); + gpointer (*worker_init_func)(struct rspamd_config *cfg); void (*worker_start_func)(struct rspamd_worker *worker); gboolean has_socket; gboolean unique; diff --git a/src/libserver/cfg_file.h b/src/libserver/cfg_file.h index ae809ea25..f502d5775 100644 --- a/src/libserver/cfg_file.h +++ b/src/libserver/cfg_file.h @@ -9,7 +9,6 @@ #include "config.h" #include "mem_pool.h" #include "upstream.h" -#include "memcached.h" #include "symbols_cache.h" #include "cfg_rcl.h" #include "utlist.h" @@ -17,10 +16,6 @@ #define DEFAULT_BIND_PORT 11333 #define DEFAULT_CONTROL_PORT 11334 -#define MAX_MEMCACHED_SERVERS 4 -#define DEFAULT_MEMCACHED_PORT 11211 -/* Memcached timeouts */ -#define DEFAULT_MEMCACHED_CONNECT_TIMEOUT 1000 /* Upstream timeouts */ #define DEFAULT_UPSTREAM_ERROR_TIME 10 #define DEFAULT_UPSTREAM_ERROR_TIME 10 @@ -88,17 +83,6 @@ struct rspamd_regexp { }; /** - * Memcached server object - */ -struct memcached_server { - struct upstream up; /**< common upstream base */ - struct in_addr addr; /**< address of server */ - guint16 port; /**< port to connect */ - short alive; /**< is this server alive */ - gint16 num; /**< number of servers in case of mirror */ -}; - -/** * script module list item */ struct script_module { @@ -299,14 +283,6 @@ struct rspamd_config { guint32 statfile_sync_timeout; /**< synchronization timeout */ gboolean mlock_statfile_pool; /**< use mlock (2) for locking statfiles */ - struct memcached_server memcached_servers[MAX_MEMCACHED_SERVERS]; /**< memcached servers */ - gsize memcached_servers_num; /**< number of memcached servers */ - memc_proto_t memcached_protocol; /**< memcached protocol */ - guint memcached_error_time; /**< memcached error time (see upstream documentation) */ - guint memcached_dead_time; /**< memcached dead time */ - guint memcached_maxerrors; /**< maximum number of errors */ - guint memcached_connect_timeout; /**< connection timeout */ - gboolean delivery_enable; /**< is delivery agent is enabled */ gchar *deliver_host; /**< host for mail deliviring */ struct in_addr deliver_addr; /**< its address */ diff --git a/src/libserver/cfg_utils.c b/src/libserver/cfg_utils.c index 120128f5e..f2b56b364 100644 --- a/src/libserver/cfg_utils.c +++ b/src/libserver/cfg_utils.c @@ -261,12 +261,6 @@ rspamd_parse_bind_line (struct rspamd_config *cfg, struct rspamd_worker_conf *cf void rspamd_config_defaults (struct rspamd_config *cfg) { - - cfg->memcached_error_time = DEFAULT_UPSTREAM_ERROR_TIME; - cfg->memcached_dead_time = DEFAULT_UPSTREAM_DEAD_TIME; - cfg->memcached_maxerrors = DEFAULT_UPSTREAM_MAXERRORS; - cfg->memcached_protocol = TCP_TEXT; - cfg->dns_timeout = 1000; cfg->dns_retransmits = 5; /* After 20 errors do throttling for 10 seconds */ diff --git a/src/libutil/CMakeLists.txt b/src/libutil/CMakeLists.txt index 2a5ab46c5..1475022f6 100644 --- a/src/libutil/CMakeLists.txt +++ b/src/libutil/CMakeLists.txt @@ -8,7 +8,6 @@ SET(LIBRSPAMDUTILSRC aio_event.c http.c logger.c map.c - memcached.c mem_pool.c printf.c radix.c diff --git a/src/libutil/memcached.c b/src/libutil/memcached.c deleted file mode 100644 index e4c9be9d2..000000000 --- a/src/libutil/memcached.c +++ /dev/null @@ -1,831 +0,0 @@ -/* - * Copyright (c) 2009-2012, Vsevolod Stakhov - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#ifdef _THREAD_SAFE -# include <pthread.h> -#endif - -#include <stdarg.h> - -#include <sys/types.h> -#include <sys/stat.h> -#include <sys/param.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <sysexits.h> -#include <unistd.h> -#include <syslog.h> - -#include <netinet/in.h> -#include <arpa/inet.h> -#include <sys/socket.h> -#include <sys/poll.h> -#include <errno.h> -#include <fcntl.h> -#include <sys/uio.h> -#include <event.h> -#include <glib.h> - -#include "memcached.h" - -#define CRLF "\r\n" -#define END_TRAILER "END" CRLF -#define STORED_TRAILER "STORED" CRLF -#define NOT_STORED_TRAILER "NOT STORED" CRLF -#define EXISTS_TRAILER "EXISTS" CRLF -#define DELETED_TRAILER "DELETED" CRLF -#define NOT_FOUND_TRAILER "NOT_FOUND" CRLF -#define CLIENT_ERROR_TRAILER "CLIENT_ERROR" -#define SERVER_ERROR_TRAILER "SERVER_ERROR" - -#define READ_BUFSIZ 1500 -#define MAX_RETRIES 3 - -/* Header for udp protocol */ -struct memc_udp_header { - guint16 req_id; - guint16 seq_num; - guint16 dg_sent; - guint16 unused; -}; - -static void socket_callback (gint fd, short what, void *arg); -static gint memc_parse_header (gchar *buf, size_t * len, gchar **end); - -/* - * Write to syslog if OPT_DEBUG is specified - */ -static void -memc_log (const memcached_ctx_t * ctx, gint line, const gchar *fmt, ...) -{ - va_list args; - if (ctx->options & MEMC_OPT_DEBUG) { - va_start (args, fmt); - g_log (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port)); - g_logv (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, fmt, args); - va_end (args); - } -} - -/* - * Callback for write command - */ -static void -write_handler (gint fd, short what, memcached_ctx_t * ctx) -{ - gchar read_buf[READ_BUFSIZ]; - gint retries; - ssize_t r; - struct memc_udp_header header; - struct iovec iov[4]; - - /* Write something to memcached */ - if (what == EV_WRITE) { - if (ctx->protocol == UDP_TEXT) { - /* Send udp header */ - bzero (&header, sizeof (header)); - header.dg_sent = htons (1); - header.req_id = ctx->count; - } - - r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, ctx->cmd, ctx->param->key, ctx->param->expire, ctx->param->bufsize); - memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf); - - if (ctx->protocol == UDP_TEXT) { - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - if (ctx->param->bufpos == 0) { - iov[1].iov_base = read_buf; - iov[1].iov_len = r; - } - else { - iov[1].iov_base = NULL; - iov[1].iov_len = 0; - } - iov[2].iov_base = ctx->param->buf + ctx->param->bufpos; - iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos; - iov[3].iov_base = CRLF; - iov[3].iov_len = sizeof (CRLF) - 1; - if (writev (ctx->sock, iov, 4) == -1) { - memc_log (ctx, __LINE__, "memc_write: writev failed: %s", strerror (errno)); - } - } - else { - iov[0].iov_base = read_buf; - iov[0].iov_len = r; - iov[1].iov_base = ctx->param->buf + ctx->param->bufpos; - iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos; - iov[2].iov_base = CRLF; - iov[2].iov_len = sizeof (CRLF) - 1; - if (writev (ctx->sock, iov, 3) == -1) { - memc_log (ctx, __LINE__, "memc_write: writev failed: %s", strerror (errno)); - } - } - event_del (&ctx->mem_ev); - event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); - event_add (&ctx->mem_ev, &ctx->timeout); - } - else if (what == EV_READ) { - /* Read header */ - retries = 0; - while (ctx->protocol == UDP_TEXT) { - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = READ_BUFSIZ; - if ((r = readv (ctx->sock, iov, 2)) == -1) { - event_del (&ctx->mem_ev); - ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); - } - if (header.req_id != ctx->count && retries < MAX_RETRIES) { - retries++; - /* Not our reply packet */ - continue; - } - break; - } - if (ctx->protocol != UDP_TEXT) { - r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); - } - memc_log (ctx, __LINE__, "memc_write: read reply from memcached: %s", read_buf); - /* Increment count */ - ctx->count++; - event_del (&ctx->mem_ev); - if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { - ctx->callback (ctx, OK, ctx->callback_data); - } - else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) { - ctx->callback (ctx, CLIENT_ERROR, ctx->callback_data); - } - else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) { - ctx->callback (ctx, EXISTS, ctx->callback_data); - } - else { - ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); - } - } - else if (what == EV_TIMEOUT) { - event_del (&ctx->mem_ev); - ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); - } -} - -/* - * Callback for read command - */ -static void -read_handler (gint fd, short what, memcached_ctx_t * ctx) -{ - gchar read_buf[READ_BUFSIZ]; - gchar *p; - ssize_t r; - size_t datalen; - struct memc_udp_header header; - struct iovec iov[2]; - gint retries = 0, t; - - if (what == EV_WRITE) { - /* Send command to memcached */ - if (ctx->protocol == UDP_TEXT) { - /* Send udp header */ - bzero (&header, sizeof (header)); - header.dg_sent = htons (1); - header.req_id = ctx->count; - } - - r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, ctx->cmd, ctx->param->key); - memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf); - if (ctx->protocol == UDP_TEXT) { - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = r; - if (writev (ctx->sock, iov, 2) == -1) { - memc_log (ctx, __LINE__, "memc_write: writev failed: %s", strerror (errno)); - } - } - else { - if (write (ctx->sock, read_buf, r) == -1) { - memc_log (ctx, __LINE__, "memc_write: write failed: %s", strerror (errno)); - } - } - event_del (&ctx->mem_ev); - event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); - event_add (&ctx->mem_ev, &ctx->timeout); - } - else if (what == EV_READ) { - while (ctx->protocol == UDP_TEXT) { - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = READ_BUFSIZ; - if ((r = readv (ctx->sock, iov, 2)) == -1) { - event_del (&ctx->mem_ev); - ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); - return; - } - memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf); - if (header.req_id != ctx->count && retries < MAX_RETRIES) { - memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count); - retries++; - /* Not our reply packet */ - continue; - } - break; - } - if (ctx->protocol != UDP_TEXT) { - r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); - } - - if (r > 0) { - read_buf[r] = 0; - if (ctx->param->bufpos == 0) { - t = memc_parse_header (read_buf, &datalen, &p); - if (t < 0) { - event_del (&ctx->mem_ev); - memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply"); - ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); - return; - } - else if (t == 0) { - memc_log (ctx, __LINE__, "memc_read: record does not exists"); - event_del (&ctx->mem_ev); - ctx->callback (ctx, NOT_EXISTS, ctx->callback_data); - return; - } - - if (datalen > ctx->param->bufsize) { - memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen); - event_del (&ctx->mem_ev); - ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data); - return; - } - /* Check if we already have all data in buffer */ - if (r >= (ssize_t)(datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2)) { - /* Store all data in param's buffer */ - memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen); - /* Increment count */ - ctx->count++; - event_del (&ctx->mem_ev); - ctx->callback (ctx, OK, ctx->callback_data); - return; - } - /* Subtract from sum parsed header's length */ - r -= p - read_buf; - } - else { - p = read_buf; - } - - if (strncmp (ctx->param->buf + ctx->param->bufpos + r - sizeof (END_TRAILER) - sizeof (CRLF) + 2, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) { - r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2; - memcpy (ctx->param->buf + ctx->param->bufpos, p, r); - event_del (&ctx->mem_ev); - ctx->callback (ctx, OK, ctx->callback_data); - return; - } - /* Store this part of data in param's buffer */ - memcpy (ctx->param->buf + ctx->param->bufpos, p, r); - ctx->param->bufpos += r; - } - else { - memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %s", r, strerror (errno)); - event_del (&ctx->mem_ev); - ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); - return; - } - - ctx->count++; - } - else if (what == EV_TIMEOUT) { - event_del (&ctx->mem_ev); - ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); - } - -} - -/* - * Callback for delete command - */ -static void -delete_handler (gint fd, short what, memcached_ctx_t * ctx) -{ - gchar read_buf[READ_BUFSIZ]; - gint retries; - ssize_t r; - struct memc_udp_header header; - struct iovec iov[2]; - - /* Write something to memcached */ - if (what == EV_WRITE) { - if (ctx->protocol == UDP_TEXT) { - /* Send udp header */ - bzero (&header, sizeof (header)); - header.dg_sent = htons (1); - header.req_id = ctx->count; - } - r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, ctx->param->key); - memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf); - - if (ctx->protocol == UDP_TEXT) { - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = r; - ctx->param->bufpos = writev (ctx->sock, iov, 2); - if (ctx->param->bufpos == (size_t)-1) { - memc_log (ctx, __LINE__, "memc_write: writev failed: %s", strerror (errno)); - } - } - else { - if (write (ctx->sock, read_buf, r) == -1) { - memc_log (ctx, __LINE__, "memc_write: write failed: %s", strerror (errno)); - } - } - event_del (&ctx->mem_ev); - event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); - event_add (&ctx->mem_ev, &ctx->timeout); - } - else if (what == EV_READ) { - /* Read header */ - retries = 0; - while (ctx->protocol == UDP_TEXT) { - iov[0].iov_base = &header; - iov[0].iov_len = sizeof (struct memc_udp_header); - iov[1].iov_base = read_buf; - iov[1].iov_len = READ_BUFSIZ; - if ((r = readv (ctx->sock, iov, 2)) == -1) { - event_del (&ctx->mem_ev); - ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); - return; - } - if (header.req_id != ctx->count && retries < MAX_RETRIES) { - retries++; - /* Not our reply packet */ - continue; - } - break; - } - if (ctx->protocol != UDP_TEXT) { - r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); - } - /* Increment count */ - ctx->count++; - event_del (&ctx->mem_ev); - if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { - ctx->callback (ctx, OK, ctx->callback_data); - } - else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) { - ctx->callback (ctx, NOT_EXISTS, ctx->callback_data); - } - else { - ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); - } - } - else if (what == EV_TIMEOUT) { - event_del (&ctx->mem_ev); - ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); - } -} - -/* - * Callback for our socket events - */ -static void -socket_callback (gint fd, short what, void *arg) -{ - memcached_ctx_t *ctx = (memcached_ctx_t *) arg; - - switch (ctx->op) { - case CMD_NULL: - /* Do nothing here */ - break; - case CMD_CONNECT: - /* We have write readiness after connect call, so reinit event */ - ctx->cmd = "connect"; - if (what == EV_WRITE) { - event_del (&ctx->mem_ev); - event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); - event_add (&ctx->mem_ev, NULL); - ctx->callback (ctx, OK, ctx->callback_data); - ctx->alive = 1; - } - else { - ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); - ctx->alive = 0; - } - break; - case CMD_WRITE: - write_handler (fd, what, ctx); - break; - case CMD_READ: - read_handler (fd, what, ctx); - break; - case CMD_DELETE: - delete_handler (fd, what, ctx); - break; - } -} - -/* - * Common callback function for memcached operations if no user's callback is specified - */ -static void -common_memc_callback (memcached_ctx_t * ctx, memc_error_t error, void *data) -{ - memc_log (ctx, __LINE__, "common_memc_callback: result of memc command '%s' is '%s'", ctx->cmd, memc_strerror (error)); -} - -/* - * Make socket for udp connection - */ -static gint -memc_make_udp_sock (memcached_ctx_t * ctx) -{ - struct sockaddr_in sc; - gint ofl; - - bzero (&sc, sizeof (struct sockaddr_in *)); - sc.sin_family = AF_INET; - sc.sin_port = ctx->port; - memcpy (&sc.sin_addr, &ctx->addr, sizeof (struct in_addr)); - - ctx->sock = socket (PF_INET, SOCK_DGRAM, 0); - - if (ctx->sock == -1) { - memc_log (ctx, __LINE__, "memc_make_udp_sock: socket() failed: %s", strerror (errno)); - return -1; - } - - /* set nonblocking */ - ofl = fcntl (ctx->sock, F_GETFL, 0); - fcntl (ctx->sock, F_SETFL, ofl | O_NONBLOCK); - - /* - * Call connect to set default destination for datagrams - * May not block - */ - ctx->op = CMD_CONNECT; - event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); - event_add (&ctx->mem_ev, NULL); - return connect (ctx->sock, (struct sockaddr *)&sc, sizeof (struct sockaddr_in)); -} - -/* - * Make socket for tcp connection - */ -static gint -memc_make_tcp_sock (memcached_ctx_t * ctx) -{ - struct sockaddr_in sc; - gint ofl, r; - - bzero (&sc, sizeof (struct sockaddr_in *)); - sc.sin_family = AF_INET; - sc.sin_port = ctx->port; - memcpy (&sc.sin_addr, &ctx->addr, sizeof (struct in_addr)); - - ctx->sock = socket (PF_INET, SOCK_STREAM, 0); - - if (ctx->sock == -1) { - memc_log (ctx, __LINE__, "memc_make_tcp_sock: socket() failed: %s", strerror (errno)); - return -1; - } - - /* set nonblocking */ - ofl = fcntl (ctx->sock, F_GETFL, 0); - fcntl (ctx->sock, F_SETFL, ofl | O_NONBLOCK); - - if ((r = connect (ctx->sock, (struct sockaddr *)&sc, sizeof (struct sockaddr_in))) == -1) { - if (errno != EINPROGRESS) { - close (ctx->sock); - ctx->sock = -1; - memc_log (ctx, __LINE__, "memc_make_tcp_sock: connect() failed: %s", strerror (errno)); - return -1; - } - } - ctx->op = CMD_CONNECT; - event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); - event_add (&ctx->mem_ev, &ctx->timeout); - return 0; -} - -/* - * Parse VALUE reply from server and set len argument to value returned by memcached - */ -static gint -memc_parse_header (gchar *buf, size_t * len, gchar **end) -{ - gchar *p, *c; - gint i; - - /* VALUE <key> <flags> <bytes> [<cas unique>]\r\n */ - c = strstr (buf, CRLF); - if (c == NULL) { - return -1; - } - *end = c + sizeof (CRLF) - 1; - - if (strncmp (buf, "VALUE ", sizeof ("VALUE ") - 1) == 0) { - p = buf + sizeof ("VALUE ") - 1; - - /* Read bytes value and ignore all other fields, such as flags and key */ - for (i = 0; i < 2; i++) { - while (p++ < c && *p != ' '); - - if (p > c) { - return -1; - } - } - *len = strtoul (p, &c, 10); - return 1; - } - /* If value not found memcached return just END\r\n , in this case return 0 */ - else if (strncmp (buf, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) { - return 0; - } - - return -1; -} - - -/* - * Common read command handler for memcached - */ -memc_error_t -memc_read (memcached_ctx_t * ctx, const gchar *cmd, memcached_param_t * param) -{ - ctx->cmd = cmd; - ctx->op = CMD_READ; - ctx->param = param; - event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); - event_add (&ctx->mem_ev, &ctx->timeout); - - return OK; -} - -/* - * Common write command handler for memcached - */ -memc_error_t -memc_write (memcached_ctx_t * ctx, const gchar *cmd, memcached_param_t * param, gint expire) -{ - ctx->cmd = cmd; - ctx->op = CMD_WRITE; - ctx->param = param; - param->expire = expire; - event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); - event_add (&ctx->mem_ev, &ctx->timeout); - - return OK; -} - -/* - * Delete command handler - */ -memc_error_t -memc_delete (memcached_ctx_t * ctx, memcached_param_t * param) -{ - ctx->cmd = "delete"; - ctx->op = CMD_DELETE; - ctx->param = param; - event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); - event_add (&ctx->mem_ev, &ctx->timeout); - - return OK; -} - -/* - * Write handler for memcached mirroring - * writing is done to each memcached server - */ -memc_error_t -memc_write_mirror (memcached_ctx_t * ctx, size_t memcached_num, const gchar *cmd, memcached_param_t * param, gint expire) -{ - memc_error_t r, result = OK; - - while (memcached_num--) { - if (ctx[memcached_num].alive == 1) { - r = memc_write (&ctx[memcached_num], cmd, param, expire); - if (r != OK) { - memc_log (&ctx[memcached_num], __LINE__, "memc_write_mirror: cannot write to mirror server: %s", memc_strerror (r)); - result = r; - ctx[memcached_num].alive = 0; - } - } - } - - return result; -} - -/* - * Read handler for memcached mirroring - * reading is done from first active memcached server - */ -memc_error_t -memc_read_mirror (memcached_ctx_t * ctx, size_t memcached_num, const gchar *cmd, memcached_param_t * param) -{ - memc_error_t r, result = OK; - - while (memcached_num--) { - if (ctx[memcached_num].alive == 1) { - r = memc_read (&ctx[memcached_num], cmd, param); - if (r != OK) { - result = r; - if (r != NOT_EXISTS) { - ctx[memcached_num].alive = 0; - memc_log (&ctx[memcached_num], __LINE__, "memc_read_mirror: cannot write read from mirror server: %s", memc_strerror (r)); - } - else { - memc_log (&ctx[memcached_num], __LINE__, "memc_read_mirror: record not exists", memc_strerror (r)); - } - } - else { - break; - } - } - } - - return result; -} - -/* - * Delete handler for memcached mirroring - * deleting is done for each active memcached server - */ -memc_error_t -memc_delete_mirror (memcached_ctx_t * ctx, size_t memcached_num, const gchar *cmd, memcached_param_t * param) -{ - memc_error_t r, result = OK; - - while (memcached_num--) { - if (ctx[memcached_num].alive == 1) { - r = memc_delete (&ctx[memcached_num], param); - if (r != OK) { - result = r; - if (r != NOT_EXISTS) { - ctx[memcached_num].alive = 0; - memc_log (&ctx[memcached_num], __LINE__, "memc_delete_mirror: cannot delete from mirror server: %s", memc_strerror (r)); - } - } - } - } - - return result; -} - - -/* - * Initialize memcached context for specified protocol - */ -gint -memc_init_ctx (memcached_ctx_t * ctx) -{ - if (ctx == NULL) { - return -1; - } - - ctx->count = 0; - ctx->alive = 0; - ctx->op = CMD_NULL; - /* Set default callback */ - if (ctx->callback == NULL) { - ctx->callback = common_memc_callback; - } - - switch (ctx->protocol) { - case UDP_TEXT: - return memc_make_udp_sock (ctx); - break; - case TCP_TEXT: - return memc_make_tcp_sock (ctx); - break; - /* Not implemented */ - case UDP_BIN: - case TCP_BIN: - default: - return -1; - } -} - -/* - * Mirror init - */ -gint -memc_init_ctx_mirror (memcached_ctx_t * ctx, size_t memcached_num) -{ - gint r, result = -1; - while (memcached_num--) { - if (ctx[memcached_num].alive == 1) { - r = memc_init_ctx (&ctx[memcached_num]); - if (r == -1) { - ctx[memcached_num].alive = 0; - memc_log (&ctx[memcached_num], __LINE__, "memc_init_ctx_mirror: cannot connect to server"); - } - else { - result = 1; - } - } - } - - return result; -} - -/* - * Close context connection - */ -gint -memc_close_ctx (memcached_ctx_t * ctx) -{ - if (ctx != NULL && ctx->sock != -1) { - event_del (&ctx->mem_ev); - return close (ctx->sock); - } - - return -1; -} - -/* - * Mirror close - */ -gint -memc_close_ctx_mirror (memcached_ctx_t * ctx, size_t memcached_num) -{ - gint r = 0; - while (memcached_num--) { - if (ctx[memcached_num].alive == 1) { - r = memc_close_ctx (&ctx[memcached_num]); - if (r == -1) { - memc_log (&ctx[memcached_num], __LINE__, "memc_close_ctx_mirror: cannot close connection to server properly"); - ctx[memcached_num].alive = 0; - } - } - } - - return r; -} - - -const gchar * -memc_strerror (memc_error_t err) -{ - const gchar *p; - - switch (err) { - case OK: - p = "Ok"; - break; - case BAD_COMMAND: - p = "Bad command"; - break; - case CLIENT_ERROR: - p = "Client error"; - break; - case SERVER_ERROR: - p = "Server error"; - break; - case SERVER_TIMEOUT: - p = "Server timeout"; - break; - case NOT_EXISTS: - p = "Key not found"; - break; - case EXISTS: - p = "Key already exists"; - break; - case WRONG_LENGTH: - p = "Wrong result length"; - break; - default: - p = "Unknown error"; - break; - } - - return p; -} - -/* - * vi:ts=4 - */ diff --git a/src/libutil/memcached.h b/src/libutil/memcached.h deleted file mode 100644 index 098e26eea..000000000 --- a/src/libutil/memcached.h +++ /dev/null @@ -1,142 +0,0 @@ -#ifndef MEMCACHED_H -#define MEMCACHED_H - -#include <sys/types.h> -#include <netinet/in.h> -#include <sys/time.h> -#include <time.h> - -#define MAXKEYLEN 250 - -#define MEMC_OPT_DEBUG 0x1 - -struct event; - -typedef enum memc_error { - OK, - BAD_COMMAND, - CLIENT_ERROR, - SERVER_ERROR, - SERVER_TIMEOUT, - NOT_EXISTS, - EXISTS, - WRONG_LENGTH -} memc_error_t; - -/* XXX: Only UDP_TEXT is supported at present */ -typedef enum memc_proto { - UDP_TEXT, - TCP_TEXT, - UDP_BIN, - TCP_BIN -} memc_proto_t; - -typedef enum memc_op { - CMD_NULL, - CMD_CONNECT, - CMD_READ, - CMD_WRITE, - CMD_DELETE, -} memc_opt_t; - -typedef struct memcached_param_s { - gchar key[MAXKEYLEN]; - u_char *buf; - size_t bufsize; - size_t bufpos; - gint expire; -} memcached_param_t; - - -/* Port must be in network byte order */ -typedef struct memcached_ctx_s { - memc_proto_t protocol; - struct in_addr addr; - guint16 port; - gint sock; - struct timeval timeout; - /* Counter that is used for memcached operations in network byte order */ - guint16 count; - /* Flag that signalize that this memcached is alive */ - short alive; - /* Options that can be specified for memcached connection */ - short options; - /* Current operation */ - memc_opt_t op; - /* Current command */ - const gchar *cmd; - /* Current param */ - memcached_param_t *param; - /* Callback for current operation */ - void (*callback) (struct memcached_ctx_s *ctx, memc_error_t error, void *data); - /* Data for callback function */ - void *callback_data; - /* Event structure */ - struct event mem_ev; -} memcached_ctx_t; - -typedef void (*memcached_callback_t) (memcached_ctx_t *ctx, memc_error_t error, void *data); - -/* - * Initialize connection to memcached server: - * addr, port and timeout fields in ctx must be filled with valid values - * Return: - * 0 - success - * -1 - error (error is stored in errno) - */ -gint memc_init_ctx (memcached_ctx_t *ctx); -gint memc_init_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num); -/* - * Memcached function for getting, setting, adding values to memcached server - * ctx - valid memcached context - * key - key to extract (max 250 characters as it specified in memcached API) - * buf, elemsize, nelem - allocated buffer of length nelem structures each of elemsize - * that would contain extracted data (NOT NULL TERMINATED) - * Return: - * memc_error_t - * nelem is changed according to actual number of extracted data - * - * "set" means "store this data". - * - * "add" means "store this data, but only if the server *doesn't* already - * hold data for this key". - - * "replace" means "store this data, but only if the server *does* - * already hold data for this key". - - * "append" means "add this data to an existing key after existing data". - - * "prepend" means "add this data to an existing key before existing data". - */ -#define memc_get(ctx, param) memc_read(ctx, "get", param) -#define memc_set(ctx, param, expire) memc_write(ctx, "set", param, expire) -#define memc_add(ctx, param, expire) memc_write(ctx, "add", param, expire) -#define memc_replace(ctx, param, expire) memc_write(ctx, "replace", param, expire) -#define memc_append(ctx, param, expire) memc_write(ctx, "append", param, expire) -#define memc_prepend(ctx, param, expire) memc_write(ctx, "prepend", param, expire) - -/* Functions that works with mirror of memcached servers */ -#define memc_get_mirror(ctx, num, param) memc_read_mirror(ctx, num, "get", param) -#define memc_set_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "set", param, expire) -#define memc_add_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "add", param, expire) -#define memc_replace_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "replace", param, expire) -#define memc_append_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "append", param, expire) -#define memc_prepend_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "prepend", param, expire) - - -memc_error_t memc_read (memcached_ctx_t *ctx, const gchar *cmd, memcached_param_t *param); -memc_error_t memc_write (memcached_ctx_t *ctx, const gchar *cmd, memcached_param_t *param, gint expire); -memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params); - -memc_error_t memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const gchar *cmd, memcached_param_t *param, gint expire); -memc_error_t memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const gchar *cmd, memcached_param_t *param); -memc_error_t memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const gchar *cmd, memcached_param_t *param); - -/* Return symbolic name of memcached error*/ -const gchar * memc_strerror (memc_error_t err); - -/* Destroy socket from ctx */ -gint memc_close_ctx (memcached_ctx_t *ctx); -gint memc_close_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num); - -#endif diff --git a/src/main.h b/src/main.h index 9b4aebe38..df3792d13 100644 --- a/src/main.h +++ b/src/main.h @@ -11,7 +11,6 @@ #include "mem_pool.h" #include "statfile.h" #include "url.h" -#include "memcached.h" #include "protocol.h" #include "filter.h" #include "buffer.h" diff --git a/src/memcached-test.c b/src/memcached-test.c deleted file mode 100644 index 53bb4c0d3..000000000 --- a/src/memcached-test.c +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2009-2012, Vsevolod Stakhov - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#include <stdio.h> -#include <stdlib.h> -#include <netdb.h> -#include <errno.h> -#include <string.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <event.h> - -#include "upstream.h" -#include "memcached.h" - -#define HOST "127.0.0.1" -#define PORT 11211 - -memcached_param_t cur_param; - -static void -test_memc_callback (memcached_ctx_t * ctx, memc_error_t error, void *data) -{ - gint s; - gint r; - gint *num = ((gint *)data); - printf ("result of memc command '%s' is '%s'\n", ctx->cmd, memc_strerror (error)); - /* Connect */ - if (*num == 0) { - printf ("Setting value to memcached: %s -> %s\n", cur_param.key, (gchar *)cur_param.buf); - s = 1; - r = memc_set (ctx, &cur_param, &s, 60); - (*num)++; - } - else if (*num == 1) { - printf ("Getting value from memcached: %s -> %s\n", cur_param.key, (gchar *)cur_param.buf); - s = 1; - r = memc_get (ctx, &cur_param, &s); - (*num)++; - } - else { - printf ("Got value from memcached: %s -> %s\n", cur_param.key, (gchar *)cur_param.buf); - event_loopexit (NULL); - } -} - - -gint -main (gint argc, gchar **argv) -{ - memcached_ctx_t mctx; - gchar *addr, buf[512]; - gint num = 0; - - event_init (); - strcpy (cur_param.key, "testkey"); - strcpy (buf, "test_value"); - cur_param.buf = buf; - cur_param.bufsize = sizeof ("test_value") - 1; - - if (argc == 2) { - addr = argv[1]; - } - else { - addr = HOST; - } - - mctx.protocol = TCP_TEXT; - mctx.timeout.tv_sec = 1; - mctx.timeout.tv_usec = 0; - mctx.port = htons (PORT); - mctx.options = MEMC_OPT_DEBUG; - mctx.callback = test_memc_callback; - /* XXX: it is wrong to use local variable pointer here */ - mctx.callback_data = (void *)# - inet_aton (addr, &mctx.addr); - - memc_init_ctx (&mctx); - - event_loop (0); - return 0; -} diff --git a/src/plugins/surbl.c b/src/plugins/surbl.c index 41873e15d..29c557b92 100644 --- a/src/plugins/surbl.c +++ b/src/plugins/surbl.c @@ -706,106 +706,6 @@ dns_callback (struct rdns_reply *reply, gpointer arg) } static void -memcached_callback (memcached_ctx_t * ctx, memc_error_t error, void *data) -{ - struct memcached_param *param = (struct memcached_param *)data; - gint *url_count; - - switch (ctx->op) { - case CMD_CONNECT: - if (error != OK) { - msg_info ("memcached returned error %s on CONNECT stage", memc_strerror (error)); - memc_close_ctx (param->ctx); - } - else { - memc_get (param->ctx, param->ctx->param); - } - break; - case CMD_READ: - if (error != OK) { - msg_info ("memcached returned error %s on READ stage", memc_strerror (error)); - memc_close_ctx (param->ctx); - } - else { - url_count = (gint *)param->ctx->param->buf; - /* Do not check DNS for urls that have count more than max_urls */ - if (*url_count > (gint)surbl_module_ctx->max_urls) { - msg_info ("url '%s' has count %d, max: %d", struri (param->url), *url_count, surbl_module_ctx->max_urls); - /* - * XXX: try to understand why we should use memcached here - * insert_result (param->task, surbl_module_ctx->metric, surbl_module_ctx->symbol, 1); - */ - } - (*url_count)++; - memc_set (param->ctx, param->ctx->param, surbl_module_ctx->url_expire); - } - break; - case CMD_WRITE: - if (error != OK) { - msg_info ("memcached returned error %s on WRITE stage", memc_strerror (error)); - } - memc_close_ctx (param->ctx); - make_surbl_requests (param->url, param->task, param->suffix, FALSE, param->tree); - break; - default: - return; - } -} - -static void -register_memcached_call (struct uri *url, struct rspamd_task *task, - struct suffix_item *suffix, GTree *tree) -{ - struct memcached_param *param; - struct memcached_server *selected; - memcached_param_t *cur_param; - gchar *sum_str; - gint *url_count; - - param = rspamd_mempool_alloc (task->task_pool, sizeof (struct memcached_param)); - cur_param = rspamd_mempool_alloc0 (task->task_pool, sizeof (memcached_param_t)); - url_count = rspamd_mempool_alloc (task->task_pool, sizeof (gint)); - - param->url = url; - param->task = task; - param->suffix = suffix; - param->tree = tree; - - param->ctx = rspamd_mempool_alloc0 (task->task_pool, sizeof (memcached_ctx_t)); - - cur_param->buf = (gchar *) url_count; - cur_param->bufsize = sizeof (gint); - - sum_str = g_compute_checksum_for_string (G_CHECKSUM_MD5, struri (url), -1); - rspamd_strlcpy (cur_param->key, sum_str, sizeof (cur_param->key)); - g_free (sum_str); - - selected = (struct memcached_server *)get_upstream_by_hash ((void *)task->cfg->memcached_servers, - task->cfg->memcached_servers_num, sizeof (struct memcached_server), - time (NULL), task->cfg->memcached_error_time, task->cfg->memcached_dead_time, task->cfg->memcached_maxerrors, cur_param->key, strlen (cur_param->key)); - if (selected == NULL) { - msg_err ("no memcached servers can be selected"); - return; - } - param->ctx->callback = memcached_callback; - param->ctx->callback_data = (void *)param; - param->ctx->protocol = task->cfg->memcached_protocol; - memcpy (¶m->ctx->addr, &selected->addr, sizeof (struct in_addr)); - param->ctx->port = selected->port; - param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout / 1000; - param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout - param->ctx->timeout.tv_sec * 1000; - param->ctx->sock = -1; - -#ifdef WITH_DEBUG - param->ctx->options = MEMC_OPT_DEBUG; -#else - param->ctx->options = 0; -#endif - param->ctx->param = cur_param; - memc_init_ctx (param->ctx); -} - -static void free_redirector_session (void *ud) { struct redirector_param *param = (struct redirector_param *)ud; @@ -990,12 +890,7 @@ surbl_tree_url_callback (gpointer key, gpointer value, void *data) make_surbl_requests (url, param->task, param->suffix, FALSE, param->tree); } else { - if (param->task->worker->srv->cfg->memcached_servers_num > 0) { - register_memcached_call (url, param->task, param->suffix, param->tree); - } - else { - make_surbl_requests (url, param->task, param->suffix, FALSE, param->tree); - } + make_surbl_requests (url, param->task, param->suffix, FALSE, param->tree); } return FALSE; diff --git a/src/plugins/surbl.h b/src/plugins/surbl.h index a9b0c194c..912e7fb11 100644 --- a/src/plugins/surbl.h +++ b/src/plugins/surbl.h @@ -4,7 +4,6 @@ #include "config.h" #include "main.h" #include "cfg_file.h" -#include "memcached.h" #include "trie.h" #define DEFAULT_REDIRECTOR_PORT 8080 @@ -77,14 +76,6 @@ struct redirector_param { struct suffix_item *suffix; }; -struct memcached_param { - struct uri *url; - struct rspamd_task *task; - memcached_ctx_t *ctx; - GTree *tree; - struct suffix_item *suffix; -}; - struct surbl_bit_item { guint32 bit; const gchar *symbol; |