Browse Source

Remove memcached support.

tags/0.7.0
Vsevolod Stakhov 10 years ago
parent
commit
40c6406e4a

+ 5
- 5
config.h.in View File

@@ -461,19 +461,19 @@ typedef off_t goffset;

/* Forwarded declaration */
struct module_ctx;
struct config_file;
struct rspamd_config;
struct rspamd_worker;

typedef struct module_s {
const gchar *name;
int (*module_init_func)(struct config_file *cfg, struct module_ctx **ctx);
int (*module_config_func)(struct config_file *cfg);
int (*module_reconfig_func)(struct config_file *cfg);
int (*module_init_func)(struct rspamd_config *cfg, struct module_ctx **ctx);
int (*module_config_func)(struct rspamd_config *cfg);
int (*module_reconfig_func)(struct rspamd_config *cfg);
} module_t;

typedef struct worker_s {
const gchar *name;
gpointer (*worker_init_func)(struct config_file *cfg);
gpointer (*worker_init_func)(struct rspamd_config *cfg);
void (*worker_start_func)(struct rspamd_worker *worker);
gboolean has_socket;
gboolean unique;

+ 0
- 24
src/libserver/cfg_file.h View File

@@ -9,7 +9,6 @@
#include "config.h"
#include "mem_pool.h"
#include "upstream.h"
#include "memcached.h"
#include "symbols_cache.h"
#include "cfg_rcl.h"
#include "utlist.h"
@@ -17,10 +16,6 @@

#define DEFAULT_BIND_PORT 11333
#define DEFAULT_CONTROL_PORT 11334
#define MAX_MEMCACHED_SERVERS 4
#define DEFAULT_MEMCACHED_PORT 11211
/* Memcached timeouts */
#define DEFAULT_MEMCACHED_CONNECT_TIMEOUT 1000
/* Upstream timeouts */
#define DEFAULT_UPSTREAM_ERROR_TIME 10
#define DEFAULT_UPSTREAM_ERROR_TIME 10
@@ -87,17 +82,6 @@ struct rspamd_regexp {
gboolean is_strong; /**< true if headers search must be case sensitive */
};

/**
* Memcached server object
*/
struct memcached_server {
struct upstream up; /**< common upstream base */
struct in_addr addr; /**< address of server */
guint16 port; /**< port to connect */
short alive; /**< is this server alive */
gint16 num; /**< number of servers in case of mirror */
};

/**
* script module list item
*/
@@ -299,14 +283,6 @@ struct rspamd_config {
guint32 statfile_sync_timeout; /**< synchronization timeout */
gboolean mlock_statfile_pool; /**< use mlock (2) for locking statfiles */

struct memcached_server memcached_servers[MAX_MEMCACHED_SERVERS]; /**< memcached servers */
gsize memcached_servers_num; /**< number of memcached servers */
memc_proto_t memcached_protocol; /**< memcached protocol */
guint memcached_error_time; /**< memcached error time (see upstream documentation) */
guint memcached_dead_time; /**< memcached dead time */
guint memcached_maxerrors; /**< maximum number of errors */
guint memcached_connect_timeout; /**< connection timeout */

gboolean delivery_enable; /**< is delivery agent is enabled */
gchar *deliver_host; /**< host for mail deliviring */
struct in_addr deliver_addr; /**< its address */

+ 0
- 6
src/libserver/cfg_utils.c View File

@@ -261,12 +261,6 @@ rspamd_parse_bind_line (struct rspamd_config *cfg, struct rspamd_worker_conf *cf
void
rspamd_config_defaults (struct rspamd_config *cfg)
{

cfg->memcached_error_time = DEFAULT_UPSTREAM_ERROR_TIME;
cfg->memcached_dead_time = DEFAULT_UPSTREAM_DEAD_TIME;
cfg->memcached_maxerrors = DEFAULT_UPSTREAM_MAXERRORS;
cfg->memcached_protocol = TCP_TEXT;

cfg->dns_timeout = 1000;
cfg->dns_retransmits = 5;
/* After 20 errors do throttling for 10 seconds */

+ 0
- 1
src/libutil/CMakeLists.txt View File

@@ -8,7 +8,6 @@ SET(LIBRSPAMDUTILSRC aio_event.c
http.c
logger.c
map.c
memcached.c
mem_pool.c
printf.c
radix.c

+ 0
- 831
src/libutil/memcached.c View File

@@ -1,831 +0,0 @@
/*
* Copyright (c) 2009-2012, Vsevolod Stakhov
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#ifdef _THREAD_SAFE
# include <pthread.h>
#endif

#include <stdarg.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/param.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sysexits.h>
#include <unistd.h>
#include <syslog.h>

#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/poll.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <event.h>
#include <glib.h>

#include "memcached.h"

#define CRLF "\r\n"
#define END_TRAILER "END" CRLF
#define STORED_TRAILER "STORED" CRLF
#define NOT_STORED_TRAILER "NOT STORED" CRLF
#define EXISTS_TRAILER "EXISTS" CRLF
#define DELETED_TRAILER "DELETED" CRLF
#define NOT_FOUND_TRAILER "NOT_FOUND" CRLF
#define CLIENT_ERROR_TRAILER "CLIENT_ERROR"
#define SERVER_ERROR_TRAILER "SERVER_ERROR"

#define READ_BUFSIZ 1500
#define MAX_RETRIES 3

/* Header for udp protocol */
struct memc_udp_header {
guint16 req_id;
guint16 seq_num;
guint16 dg_sent;
guint16 unused;
};

static void socket_callback (gint fd, short what, void *arg);
static gint memc_parse_header (gchar *buf, size_t * len, gchar **end);

/*
* Write to syslog if OPT_DEBUG is specified
*/
static void
memc_log (const memcached_ctx_t * ctx, gint line, const gchar *fmt, ...)
{
va_list args;
if (ctx->options & MEMC_OPT_DEBUG) {
va_start (args, fmt);
g_log (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port));
g_logv (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, fmt, args);
va_end (args);
}
}

/*
* Callback for write command
*/
static void
write_handler (gint fd, short what, memcached_ctx_t * ctx)
{
gchar read_buf[READ_BUFSIZ];
gint retries;
ssize_t r;
struct memc_udp_header header;
struct iovec iov[4];

/* Write something to memcached */
if (what == EV_WRITE) {
if (ctx->protocol == UDP_TEXT) {
/* Send udp header */
bzero (&header, sizeof (header));
header.dg_sent = htons (1);
header.req_id = ctx->count;
}

r = snprintf (read_buf, READ_BUFSIZ, "%s %s 0 %d %zu" CRLF, ctx->cmd, ctx->param->key, ctx->param->expire, ctx->param->bufsize);
memc_log (ctx, __LINE__, "memc_write: send write request to memcached: %s", read_buf);

if (ctx->protocol == UDP_TEXT) {
iov[0].iov_base = &header;
iov[0].iov_len = sizeof (struct memc_udp_header);
if (ctx->param->bufpos == 0) {
iov[1].iov_base = read_buf;
iov[1].iov_len = r;
}
else {
iov[1].iov_base = NULL;
iov[1].iov_len = 0;
}
iov[2].iov_base = ctx->param->buf + ctx->param->bufpos;
iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos;
iov[3].iov_base = CRLF;
iov[3].iov_len = sizeof (CRLF) - 1;
if (writev (ctx->sock, iov, 4) == -1) {
memc_log (ctx, __LINE__, "memc_write: writev failed: %s", strerror (errno));
}
}
else {
iov[0].iov_base = read_buf;
iov[0].iov_len = r;
iov[1].iov_base = ctx->param->buf + ctx->param->bufpos;
iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos;
iov[2].iov_base = CRLF;
iov[2].iov_len = sizeof (CRLF) - 1;
if (writev (ctx->sock, iov, 3) == -1) {
memc_log (ctx, __LINE__, "memc_write: writev failed: %s", strerror (errno));
}
}
event_del (&ctx->mem_ev);
event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, &ctx->timeout);
}
else if (what == EV_READ) {
/* Read header */
retries = 0;
while (ctx->protocol == UDP_TEXT) {
iov[0].iov_base = &header;
iov[0].iov_len = sizeof (struct memc_udp_header);
iov[1].iov_base = read_buf;
iov[1].iov_len = READ_BUFSIZ;
if ((r = readv (ctx->sock, iov, 2)) == -1) {
event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
}
if (header.req_id != ctx->count && retries < MAX_RETRIES) {
retries++;
/* Not our reply packet */
continue;
}
break;
}
if (ctx->protocol != UDP_TEXT) {
r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
}
memc_log (ctx, __LINE__, "memc_write: read reply from memcached: %s", read_buf);
/* Increment count */
ctx->count++;
event_del (&ctx->mem_ev);
if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
ctx->callback (ctx, OK, ctx->callback_data);
}
else if (strncmp (read_buf, NOT_STORED_TRAILER, sizeof (NOT_STORED_TRAILER) - 1) == 0) {
ctx->callback (ctx, CLIENT_ERROR, ctx->callback_data);
}
else if (strncmp (read_buf, EXISTS_TRAILER, sizeof (EXISTS_TRAILER) - 1) == 0) {
ctx->callback (ctx, EXISTS, ctx->callback_data);
}
else {
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
}
}
else if (what == EV_TIMEOUT) {
event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
}
}

/*
* Callback for read command
*/
static void
read_handler (gint fd, short what, memcached_ctx_t * ctx)
{
gchar read_buf[READ_BUFSIZ];
gchar *p;
ssize_t r;
size_t datalen;
struct memc_udp_header header;
struct iovec iov[2];
gint retries = 0, t;

if (what == EV_WRITE) {
/* Send command to memcached */
if (ctx->protocol == UDP_TEXT) {
/* Send udp header */
bzero (&header, sizeof (header));
header.dg_sent = htons (1);
header.req_id = ctx->count;
}

r = snprintf (read_buf, READ_BUFSIZ, "%s %s" CRLF, ctx->cmd, ctx->param->key);
memc_log (ctx, __LINE__, "memc_read: send read request to memcached: %s", read_buf);
if (ctx->protocol == UDP_TEXT) {
iov[0].iov_base = &header;
iov[0].iov_len = sizeof (struct memc_udp_header);
iov[1].iov_base = read_buf;
iov[1].iov_len = r;
if (writev (ctx->sock, iov, 2) == -1) {
memc_log (ctx, __LINE__, "memc_write: writev failed: %s", strerror (errno));
}
}
else {
if (write (ctx->sock, read_buf, r) == -1) {
memc_log (ctx, __LINE__, "memc_write: write failed: %s", strerror (errno));
}
}
event_del (&ctx->mem_ev);
event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, &ctx->timeout);
}
else if (what == EV_READ) {
while (ctx->protocol == UDP_TEXT) {
iov[0].iov_base = &header;
iov[0].iov_len = sizeof (struct memc_udp_header);
iov[1].iov_base = read_buf;
iov[1].iov_len = READ_BUFSIZ;
if ((r = readv (ctx->sock, iov, 2)) == -1) {
event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
return;
}
memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf);
if (header.req_id != ctx->count && retries < MAX_RETRIES) {
memc_log (ctx, __LINE__, "memc_read: got wrong packet id: %d, %d was awaited", header.req_id, ctx->count);
retries++;
/* Not our reply packet */
continue;
}
break;
}
if (ctx->protocol != UDP_TEXT) {
r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
}

if (r > 0) {
read_buf[r] = 0;
if (ctx->param->bufpos == 0) {
t = memc_parse_header (read_buf, &datalen, &p);
if (t < 0) {
event_del (&ctx->mem_ev);
memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply");
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
return;
}
else if (t == 0) {
memc_log (ctx, __LINE__, "memc_read: record does not exists");
event_del (&ctx->mem_ev);
ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
return;
}

if (datalen > ctx->param->bufsize) {
memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen);
event_del (&ctx->mem_ev);
ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data);
return;
}
/* Check if we already have all data in buffer */
if (r >= (ssize_t)(datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2)) {
/* Store all data in param's buffer */
memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen);
/* Increment count */
ctx->count++;
event_del (&ctx->mem_ev);
ctx->callback (ctx, OK, ctx->callback_data);
return;
}
/* Subtract from sum parsed header's length */
r -= p - read_buf;
}
else {
p = read_buf;
}

if (strncmp (ctx->param->buf + ctx->param->bufpos + r - sizeof (END_TRAILER) - sizeof (CRLF) + 2, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2;
memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
event_del (&ctx->mem_ev);
ctx->callback (ctx, OK, ctx->callback_data);
return;
}
/* Store this part of data in param's buffer */
memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
ctx->param->bufpos += r;
}
else {
memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %s", r, strerror (errno));
event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
return;
}

ctx->count++;
}
else if (what == EV_TIMEOUT) {
event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
}

}

/*
* Callback for delete command
*/
static void
delete_handler (gint fd, short what, memcached_ctx_t * ctx)
{
gchar read_buf[READ_BUFSIZ];
gint retries;
ssize_t r;
struct memc_udp_header header;
struct iovec iov[2];

/* Write something to memcached */
if (what == EV_WRITE) {
if (ctx->protocol == UDP_TEXT) {
/* Send udp header */
bzero (&header, sizeof (header));
header.dg_sent = htons (1);
header.req_id = ctx->count;
}
r = snprintf (read_buf, READ_BUFSIZ, "delete %s" CRLF, ctx->param->key);
memc_log (ctx, __LINE__, "memc_delete: send delete request to memcached: %s", read_buf);

if (ctx->protocol == UDP_TEXT) {
iov[0].iov_base = &header;
iov[0].iov_len = sizeof (struct memc_udp_header);
iov[1].iov_base = read_buf;
iov[1].iov_len = r;
ctx->param->bufpos = writev (ctx->sock, iov, 2);
if (ctx->param->bufpos == (size_t)-1) {
memc_log (ctx, __LINE__, "memc_write: writev failed: %s", strerror (errno));
}
}
else {
if (write (ctx->sock, read_buf, r) == -1) {
memc_log (ctx, __LINE__, "memc_write: write failed: %s", strerror (errno));
}
}
event_del (&ctx->mem_ev);
event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, &ctx->timeout);
}
else if (what == EV_READ) {
/* Read header */
retries = 0;
while (ctx->protocol == UDP_TEXT) {
iov[0].iov_base = &header;
iov[0].iov_len = sizeof (struct memc_udp_header);
iov[1].iov_base = read_buf;
iov[1].iov_len = READ_BUFSIZ;
if ((r = readv (ctx->sock, iov, 2)) == -1) {
event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
return;
}
if (header.req_id != ctx->count && retries < MAX_RETRIES) {
retries++;
/* Not our reply packet */
continue;
}
break;
}
if (ctx->protocol != UDP_TEXT) {
r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
}
/* Increment count */
ctx->count++;
event_del (&ctx->mem_ev);
if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
ctx->callback (ctx, OK, ctx->callback_data);
}
else if (strncmp (read_buf, NOT_FOUND_TRAILER, sizeof (NOT_FOUND_TRAILER) - 1) == 0) {
ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
}
else {
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
}
}
else if (what == EV_TIMEOUT) {
event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
}
}

/*
* Callback for our socket events
*/
static void
socket_callback (gint fd, short what, void *arg)
{
memcached_ctx_t *ctx = (memcached_ctx_t *) arg;

switch (ctx->op) {
case CMD_NULL:
/* Do nothing here */
break;
case CMD_CONNECT:
/* We have write readiness after connect call, so reinit event */
ctx->cmd = "connect";
if (what == EV_WRITE) {
event_del (&ctx->mem_ev);
event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, NULL);
ctx->callback (ctx, OK, ctx->callback_data);
ctx->alive = 1;
}
else {
ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
ctx->alive = 0;
}
break;
case CMD_WRITE:
write_handler (fd, what, ctx);
break;
case CMD_READ:
read_handler (fd, what, ctx);
break;
case CMD_DELETE:
delete_handler (fd, what, ctx);
break;
}
}

/*
* Common callback function for memcached operations if no user's callback is specified
*/
static void
common_memc_callback (memcached_ctx_t * ctx, memc_error_t error, void *data)
{
memc_log (ctx, __LINE__, "common_memc_callback: result of memc command '%s' is '%s'", ctx->cmd, memc_strerror (error));
}

/*
* Make socket for udp connection
*/
static gint
memc_make_udp_sock (memcached_ctx_t * ctx)
{
struct sockaddr_in sc;
gint ofl;

bzero (&sc, sizeof (struct sockaddr_in *));
sc.sin_family = AF_INET;
sc.sin_port = ctx->port;
memcpy (&sc.sin_addr, &ctx->addr, sizeof (struct in_addr));

ctx->sock = socket (PF_INET, SOCK_DGRAM, 0);

if (ctx->sock == -1) {
memc_log (ctx, __LINE__, "memc_make_udp_sock: socket() failed: %s", strerror (errno));
return -1;
}

/* set nonblocking */
ofl = fcntl (ctx->sock, F_GETFL, 0);
fcntl (ctx->sock, F_SETFL, ofl | O_NONBLOCK);

/*
* Call connect to set default destination for datagrams
* May not block
*/
ctx->op = CMD_CONNECT;
event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, NULL);
return connect (ctx->sock, (struct sockaddr *)&sc, sizeof (struct sockaddr_in));
}

/*
* Make socket for tcp connection
*/
static gint
memc_make_tcp_sock (memcached_ctx_t * ctx)
{
struct sockaddr_in sc;
gint ofl, r;

bzero (&sc, sizeof (struct sockaddr_in *));
sc.sin_family = AF_INET;
sc.sin_port = ctx->port;
memcpy (&sc.sin_addr, &ctx->addr, sizeof (struct in_addr));

ctx->sock = socket (PF_INET, SOCK_STREAM, 0);

if (ctx->sock == -1) {
memc_log (ctx, __LINE__, "memc_make_tcp_sock: socket() failed: %s", strerror (errno));
return -1;
}

/* set nonblocking */
ofl = fcntl (ctx->sock, F_GETFL, 0);
fcntl (ctx->sock, F_SETFL, ofl | O_NONBLOCK);

if ((r = connect (ctx->sock, (struct sockaddr *)&sc, sizeof (struct sockaddr_in))) == -1) {
if (errno != EINPROGRESS) {
close (ctx->sock);
ctx->sock = -1;
memc_log (ctx, __LINE__, "memc_make_tcp_sock: connect() failed: %s", strerror (errno));
return -1;
}
}
ctx->op = CMD_CONNECT;
event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, &ctx->timeout);
return 0;
}

/*
* Parse VALUE reply from server and set len argument to value returned by memcached
*/
static gint
memc_parse_header (gchar *buf, size_t * len, gchar **end)
{
gchar *p, *c;
gint i;

/* VALUE <key> <flags> <bytes> [<cas unique>]\r\n */
c = strstr (buf, CRLF);
if (c == NULL) {
return -1;
}
*end = c + sizeof (CRLF) - 1;

if (strncmp (buf, "VALUE ", sizeof ("VALUE ") - 1) == 0) {
p = buf + sizeof ("VALUE ") - 1;

/* Read bytes value and ignore all other fields, such as flags and key */
for (i = 0; i < 2; i++) {
while (p++ < c && *p != ' ');

if (p > c) {
return -1;
}
}
*len = strtoul (p, &c, 10);
return 1;
}
/* If value not found memcached return just END\r\n , in this case return 0 */
else if (strncmp (buf, END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
return 0;
}

return -1;
}


/*
* Common read command handler for memcached
*/
memc_error_t
memc_read (memcached_ctx_t * ctx, const gchar *cmd, memcached_param_t * param)
{
ctx->cmd = cmd;
ctx->op = CMD_READ;
ctx->param = param;
event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, &ctx->timeout);

return OK;
}

/*
* Common write command handler for memcached
*/
memc_error_t
memc_write (memcached_ctx_t * ctx, const gchar *cmd, memcached_param_t * param, gint expire)
{
ctx->cmd = cmd;
ctx->op = CMD_WRITE;
ctx->param = param;
param->expire = expire;
event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, &ctx->timeout);

return OK;
}

/*
* Delete command handler
*/
memc_error_t
memc_delete (memcached_ctx_t * ctx, memcached_param_t * param)
{
ctx->cmd = "delete";
ctx->op = CMD_DELETE;
ctx->param = param;
event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, &ctx->timeout);

return OK;
}

/*
* Write handler for memcached mirroring
* writing is done to each memcached server
*/
memc_error_t
memc_write_mirror (memcached_ctx_t * ctx, size_t memcached_num, const gchar *cmd, memcached_param_t * param, gint expire)
{
memc_error_t r, result = OK;

while (memcached_num--) {
if (ctx[memcached_num].alive == 1) {
r = memc_write (&ctx[memcached_num], cmd, param, expire);
if (r != OK) {
memc_log (&ctx[memcached_num], __LINE__, "memc_write_mirror: cannot write to mirror server: %s", memc_strerror (r));
result = r;
ctx[memcached_num].alive = 0;
}
}
}

return result;
}

/*
* Read handler for memcached mirroring
* reading is done from first active memcached server
*/
memc_error_t
memc_read_mirror (memcached_ctx_t * ctx, size_t memcached_num, const gchar *cmd, memcached_param_t * param)
{
memc_error_t r, result = OK;

while (memcached_num--) {
if (ctx[memcached_num].alive == 1) {
r = memc_read (&ctx[memcached_num], cmd, param);
if (r != OK) {
result = r;
if (r != NOT_EXISTS) {
ctx[memcached_num].alive = 0;
memc_log (&ctx[memcached_num], __LINE__, "memc_read_mirror: cannot write read from mirror server: %s", memc_strerror (r));
}
else {
memc_log (&ctx[memcached_num], __LINE__, "memc_read_mirror: record not exists", memc_strerror (r));
}
}
else {
break;
}
}
}

return result;
}

/*
* Delete handler for memcached mirroring
* deleting is done for each active memcached server
*/
memc_error_t
memc_delete_mirror (memcached_ctx_t * ctx, size_t memcached_num, const gchar *cmd, memcached_param_t * param)
{
memc_error_t r, result = OK;

while (memcached_num--) {
if (ctx[memcached_num].alive == 1) {
r = memc_delete (&ctx[memcached_num], param);
if (r != OK) {
result = r;
if (r != NOT_EXISTS) {
ctx[memcached_num].alive = 0;
memc_log (&ctx[memcached_num], __LINE__, "memc_delete_mirror: cannot delete from mirror server: %s", memc_strerror (r));
}
}
}
}

return result;
}


/*
* Initialize memcached context for specified protocol
*/
gint
memc_init_ctx (memcached_ctx_t * ctx)
{
if (ctx == NULL) {
return -1;
}

ctx->count = 0;
ctx->alive = 0;
ctx->op = CMD_NULL;
/* Set default callback */
if (ctx->callback == NULL) {
ctx->callback = common_memc_callback;
}

switch (ctx->protocol) {
case UDP_TEXT:
return memc_make_udp_sock (ctx);
break;
case TCP_TEXT:
return memc_make_tcp_sock (ctx);
break;
/* Not implemented */
case UDP_BIN:
case TCP_BIN:
default:
return -1;
}
}

/*
* Mirror init
*/
gint
memc_init_ctx_mirror (memcached_ctx_t * ctx, size_t memcached_num)
{
gint r, result = -1;
while (memcached_num--) {
if (ctx[memcached_num].alive == 1) {
r = memc_init_ctx (&ctx[memcached_num]);
if (r == -1) {
ctx[memcached_num].alive = 0;
memc_log (&ctx[memcached_num], __LINE__, "memc_init_ctx_mirror: cannot connect to server");
}
else {
result = 1;
}
}
}

return result;
}

/*
* Close context connection
*/
gint
memc_close_ctx (memcached_ctx_t * ctx)
{
if (ctx != NULL && ctx->sock != -1) {
event_del (&ctx->mem_ev);
return close (ctx->sock);
}

return -1;
}

/*
* Mirror close
*/
gint
memc_close_ctx_mirror (memcached_ctx_t * ctx, size_t memcached_num)
{
gint r = 0;
while (memcached_num--) {
if (ctx[memcached_num].alive == 1) {
r = memc_close_ctx (&ctx[memcached_num]);
if (r == -1) {
memc_log (&ctx[memcached_num], __LINE__, "memc_close_ctx_mirror: cannot close connection to server properly");
ctx[memcached_num].alive = 0;
}
}
}

return r;
}


const gchar *
memc_strerror (memc_error_t err)
{
const gchar *p;

switch (err) {
case OK:
p = "Ok";
break;
case BAD_COMMAND:
p = "Bad command";
break;
case CLIENT_ERROR:
p = "Client error";
break;
case SERVER_ERROR:
p = "Server error";
break;
case SERVER_TIMEOUT:
p = "Server timeout";
break;
case NOT_EXISTS:
p = "Key not found";
break;
case EXISTS:
p = "Key already exists";
break;
case WRONG_LENGTH:
p = "Wrong result length";
break;
default:
p = "Unknown error";
break;
}

return p;
}

/*
* vi:ts=4
*/

+ 0
- 142
src/libutil/memcached.h View File

@@ -1,142 +0,0 @@
#ifndef MEMCACHED_H
#define MEMCACHED_H

#include <sys/types.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <time.h>

#define MAXKEYLEN 250

#define MEMC_OPT_DEBUG 0x1

struct event;

typedef enum memc_error {
OK,
BAD_COMMAND,
CLIENT_ERROR,
SERVER_ERROR,
SERVER_TIMEOUT,
NOT_EXISTS,
EXISTS,
WRONG_LENGTH
} memc_error_t;

/* XXX: Only UDP_TEXT is supported at present */
typedef enum memc_proto {
UDP_TEXT,
TCP_TEXT,
UDP_BIN,
TCP_BIN
} memc_proto_t;

typedef enum memc_op {
CMD_NULL,
CMD_CONNECT,
CMD_READ,
CMD_WRITE,
CMD_DELETE,
} memc_opt_t;

typedef struct memcached_param_s {
gchar key[MAXKEYLEN];
u_char *buf;
size_t bufsize;
size_t bufpos;
gint expire;
} memcached_param_t;


/* Port must be in network byte order */
typedef struct memcached_ctx_s {
memc_proto_t protocol;
struct in_addr addr;
guint16 port;
gint sock;
struct timeval timeout;
/* Counter that is used for memcached operations in network byte order */
guint16 count;
/* Flag that signalize that this memcached is alive */
short alive;
/* Options that can be specified for memcached connection */
short options;
/* Current operation */
memc_opt_t op;
/* Current command */
const gchar *cmd;
/* Current param */
memcached_param_t *param;
/* Callback for current operation */
void (*callback) (struct memcached_ctx_s *ctx, memc_error_t error, void *data);
/* Data for callback function */
void *callback_data;
/* Event structure */
struct event mem_ev;
} memcached_ctx_t;

typedef void (*memcached_callback_t) (memcached_ctx_t *ctx, memc_error_t error, void *data);

/*
* Initialize connection to memcached server:
* addr, port and timeout fields in ctx must be filled with valid values
* Return:
* 0 - success
* -1 - error (error is stored in errno)
*/
gint memc_init_ctx (memcached_ctx_t *ctx);
gint memc_init_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num);
/*
* Memcached function for getting, setting, adding values to memcached server
* ctx - valid memcached context
* key - key to extract (max 250 characters as it specified in memcached API)
* buf, elemsize, nelem - allocated buffer of length nelem structures each of elemsize
* that would contain extracted data (NOT NULL TERMINATED)
* Return:
* memc_error_t
* nelem is changed according to actual number of extracted data
*
* "set" means "store this data".
*
* "add" means "store this data, but only if the server *doesn't* already
* hold data for this key".

* "replace" means "store this data, but only if the server *does*
* already hold data for this key".

* "append" means "add this data to an existing key after existing data".

* "prepend" means "add this data to an existing key before existing data".
*/
#define memc_get(ctx, param) memc_read(ctx, "get", param)
#define memc_set(ctx, param, expire) memc_write(ctx, "set", param, expire)
#define memc_add(ctx, param, expire) memc_write(ctx, "add", param, expire)
#define memc_replace(ctx, param, expire) memc_write(ctx, "replace", param, expire)
#define memc_append(ctx, param, expire) memc_write(ctx, "append", param, expire)
#define memc_prepend(ctx, param, expire) memc_write(ctx, "prepend", param, expire)

/* Functions that works with mirror of memcached servers */
#define memc_get_mirror(ctx, num, param) memc_read_mirror(ctx, num, "get", param)
#define memc_set_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "set", param, expire)
#define memc_add_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "add", param, expire)
#define memc_replace_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "replace", param, expire)
#define memc_append_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "append", param, expire)
#define memc_prepend_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "prepend", param, expire)


memc_error_t memc_read (memcached_ctx_t *ctx, const gchar *cmd, memcached_param_t *param);
memc_error_t memc_write (memcached_ctx_t *ctx, const gchar *cmd, memcached_param_t *param, gint expire);
memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params);

memc_error_t memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const gchar *cmd, memcached_param_t *param, gint expire);
memc_error_t memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const gchar *cmd, memcached_param_t *param);
memc_error_t memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const gchar *cmd, memcached_param_t *param);

/* Return symbolic name of memcached error*/
const gchar * memc_strerror (memc_error_t err);

/* Destroy socket from ctx */
gint memc_close_ctx (memcached_ctx_t *ctx);
gint memc_close_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num);

#endif

+ 0
- 1
src/main.h View File

@@ -11,7 +11,6 @@
#include "mem_pool.h"
#include "statfile.h"
#include "url.h"
#include "memcached.h"
#include "protocol.h"
#include "filter.h"
#include "buffer.h"

+ 0
- 103
src/memcached-test.c View File

@@ -1,103 +0,0 @@
/*
* Copyright (c) 2009-2012, Vsevolod Stakhov
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHOR ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL AUTHOR BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#include <stdio.h>
#include <stdlib.h>
#include <netdb.h>
#include <errno.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <event.h>

#include "upstream.h"
#include "memcached.h"

#define HOST "127.0.0.1"
#define PORT 11211

memcached_param_t cur_param;

static void
test_memc_callback (memcached_ctx_t * ctx, memc_error_t error, void *data)
{
gint s;
gint r;
gint *num = ((gint *)data);
printf ("result of memc command '%s' is '%s'\n", ctx->cmd, memc_strerror (error));
/* Connect */
if (*num == 0) {
printf ("Setting value to memcached: %s -> %s\n", cur_param.key, (gchar *)cur_param.buf);
s = 1;
r = memc_set (ctx, &cur_param, &s, 60);
(*num)++;
}
else if (*num == 1) {
printf ("Getting value from memcached: %s -> %s\n", cur_param.key, (gchar *)cur_param.buf);
s = 1;
r = memc_get (ctx, &cur_param, &s);
(*num)++;
}
else {
printf ("Got value from memcached: %s -> %s\n", cur_param.key, (gchar *)cur_param.buf);
event_loopexit (NULL);
}
}


gint
main (gint argc, gchar **argv)
{
memcached_ctx_t mctx;
gchar *addr, buf[512];
gint num = 0;

event_init ();
strcpy (cur_param.key, "testkey");
strcpy (buf, "test_value");
cur_param.buf = buf;
cur_param.bufsize = sizeof ("test_value") - 1;

if (argc == 2) {
addr = argv[1];
}
else {
addr = HOST;
}

mctx.protocol = TCP_TEXT;
mctx.timeout.tv_sec = 1;
mctx.timeout.tv_usec = 0;
mctx.port = htons (PORT);
mctx.options = MEMC_OPT_DEBUG;
mctx.callback = test_memc_callback;
/* XXX: it is wrong to use local variable pointer here */
mctx.callback_data = (void *)&num;
inet_aton (addr, &mctx.addr);

memc_init_ctx (&mctx);

event_loop (0);
return 0;
}

+ 1
- 106
src/plugins/surbl.c View File

@@ -705,106 +705,6 @@ dns_callback (struct rdns_reply *reply, gpointer arg)
}
}

static void
memcached_callback (memcached_ctx_t * ctx, memc_error_t error, void *data)
{
struct memcached_param *param = (struct memcached_param *)data;
gint *url_count;

switch (ctx->op) {
case CMD_CONNECT:
if (error != OK) {
msg_info ("memcached returned error %s on CONNECT stage", memc_strerror (error));
memc_close_ctx (param->ctx);
}
else {
memc_get (param->ctx, param->ctx->param);
}
break;
case CMD_READ:
if (error != OK) {
msg_info ("memcached returned error %s on READ stage", memc_strerror (error));
memc_close_ctx (param->ctx);
}
else {
url_count = (gint *)param->ctx->param->buf;
/* Do not check DNS for urls that have count more than max_urls */
if (*url_count > (gint)surbl_module_ctx->max_urls) {
msg_info ("url '%s' has count %d, max: %d", struri (param->url), *url_count, surbl_module_ctx->max_urls);
/*
* XXX: try to understand why we should use memcached here
* insert_result (param->task, surbl_module_ctx->metric, surbl_module_ctx->symbol, 1);
*/
}
(*url_count)++;
memc_set (param->ctx, param->ctx->param, surbl_module_ctx->url_expire);
}
break;
case CMD_WRITE:
if (error != OK) {
msg_info ("memcached returned error %s on WRITE stage", memc_strerror (error));
}
memc_close_ctx (param->ctx);
make_surbl_requests (param->url, param->task, param->suffix, FALSE, param->tree);
break;
default:
return;
}
}

static void
register_memcached_call (struct uri *url, struct rspamd_task *task,
struct suffix_item *suffix, GTree *tree)
{
struct memcached_param *param;
struct memcached_server *selected;
memcached_param_t *cur_param;
gchar *sum_str;
gint *url_count;

param = rspamd_mempool_alloc (task->task_pool, sizeof (struct memcached_param));
cur_param = rspamd_mempool_alloc0 (task->task_pool, sizeof (memcached_param_t));
url_count = rspamd_mempool_alloc (task->task_pool, sizeof (gint));

param->url = url;
param->task = task;
param->suffix = suffix;
param->tree = tree;

param->ctx = rspamd_mempool_alloc0 (task->task_pool, sizeof (memcached_ctx_t));

cur_param->buf = (gchar *) url_count;
cur_param->bufsize = sizeof (gint);

sum_str = g_compute_checksum_for_string (G_CHECKSUM_MD5, struri (url), -1);
rspamd_strlcpy (cur_param->key, sum_str, sizeof (cur_param->key));
g_free (sum_str);

selected = (struct memcached_server *)get_upstream_by_hash ((void *)task->cfg->memcached_servers,
task->cfg->memcached_servers_num, sizeof (struct memcached_server),
time (NULL), task->cfg->memcached_error_time, task->cfg->memcached_dead_time, task->cfg->memcached_maxerrors, cur_param->key, strlen (cur_param->key));
if (selected == NULL) {
msg_err ("no memcached servers can be selected");
return;
}
param->ctx->callback = memcached_callback;
param->ctx->callback_data = (void *)param;
param->ctx->protocol = task->cfg->memcached_protocol;
memcpy (&param->ctx->addr, &selected->addr, sizeof (struct in_addr));
param->ctx->port = selected->port;
param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout / 1000;
param->ctx->timeout.tv_sec = task->cfg->memcached_connect_timeout - param->ctx->timeout.tv_sec * 1000;
param->ctx->sock = -1;

#ifdef WITH_DEBUG
param->ctx->options = MEMC_OPT_DEBUG;
#else
param->ctx->options = 0;
#endif
param->ctx->param = cur_param;
memc_init_ctx (param->ctx);
}

static void
free_redirector_session (void *ud)
{
@@ -990,12 +890,7 @@ surbl_tree_url_callback (gpointer key, gpointer value, void *data)
make_surbl_requests (url, param->task, param->suffix, FALSE, param->tree);
}
else {
if (param->task->worker->srv->cfg->memcached_servers_num > 0) {
register_memcached_call (url, param->task, param->suffix, param->tree);
}
else {
make_surbl_requests (url, param->task, param->suffix, FALSE, param->tree);
}
make_surbl_requests (url, param->task, param->suffix, FALSE, param->tree);
}

return FALSE;

+ 0
- 9
src/plugins/surbl.h View File

@@ -4,7 +4,6 @@
#include "config.h"
#include "main.h"
#include "cfg_file.h"
#include "memcached.h"
#include "trie.h"

#define DEFAULT_REDIRECTOR_PORT 8080
@@ -77,14 +76,6 @@ struct redirector_param {
struct suffix_item *suffix;
};

struct memcached_param {
struct uri *url;
struct rspamd_task *task;
memcached_ctx_t *ctx;
GTree *tree;
struct suffix_item *suffix;
};

struct surbl_bit_item {
guint32 bit;
const gchar *symbol;

Loading…
Cancel
Save