Browse Source

* Write test case for async memcached library

* Fix memcached async library to pass test :)
tags/0.2.7
cebka@mailsupport.rambler.ru 15 years ago
parent
commit
4ad814a6c6
7 changed files with 130 additions and 42 deletions
  1. 36
    35
      memcached.c
  2. 2
    2
      memcached.h
  3. 2
    2
      test/Makefile.in
  4. 85
    0
      test/rspamd_memcached_test.c
  5. 1
    2
      test/rspamd_test_suite.c
  6. 3
    0
      test/tests.h
  7. 1
    1
      utils/url_extracter.c

+ 36
- 35
memcached.c View File

#include <fcntl.h> #include <fcntl.h>
#include <sys/uio.h> #include <sys/uio.h>
#include <event.h> #include <event.h>
#include <glib.h>


#include "memcached.h" #include "memcached.h"


va_list args; va_list args;
if (ctx->options & MEMC_OPT_DEBUG) { if (ctx->options & MEMC_OPT_DEBUG) {
va_start (args, fmt); va_start (args, fmt);
syslog (LOG_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port));
vsyslog (LOG_DEBUG, fmt, args);
g_log (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, "memc_debug(%d): host: %s, port: %d", line, inet_ntoa (ctx->addr), ntohs (ctx->port));
g_logv (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, fmt, args);
va_end (args); va_end (args);
} }
} }
iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos; iov[2].iov_len = ctx->param->bufsize - ctx->param->bufpos;
iov[3].iov_base = CRLF; iov[3].iov_base = CRLF;
iov[3].iov_len = sizeof (CRLF) - 1; iov[3].iov_len = sizeof (CRLF) - 1;
ctx->param->bufpos = writev (ctx->sock, iov, 4);
writev (ctx->sock, iov, 4);
} }
else { else {
iov[0].iov_base = read_buf; iov[0].iov_base = read_buf;
iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos; iov[1].iov_len = ctx->param->bufsize - ctx->param->bufpos;
iov[2].iov_base = CRLF; iov[2].iov_base = CRLF;
iov[2].iov_len = sizeof (CRLF) - 1; iov[2].iov_len = sizeof (CRLF) - 1;
ctx->param->bufpos = writev (ctx->sock, iov, 3);
}
if (ctx->param->bufpos == ctx->param->bufsize) {
event_del (&ctx->mem_ev);
event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, &ctx->timeout);
}
else {
event_del (&ctx->mem_ev);
event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, &ctx->timeout);
writev (ctx->sock, iov, 3);
} }
event_del (&ctx->mem_ev);
event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, &ctx->timeout);
} }
else if (what == EV_READ) { else if (what == EV_READ) {
/* Read header */ /* Read header */
iov[1].iov_base = read_buf; iov[1].iov_base = read_buf;
iov[1].iov_len = READ_BUFSIZ; iov[1].iov_len = READ_BUFSIZ;
if ((r = readv (ctx->sock, iov, 2)) == -1) { if ((r = readv (ctx->sock, iov, 2)) == -1) {
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
event_del (&ctx->mem_ev); event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
} }
if (header.req_id != ctx->count && retries < MAX_RETRIES) { if (header.req_id != ctx->count && retries < MAX_RETRIES) {
retries ++; retries ++;
if (ctx->protocol != UDP_TEXT) { if (ctx->protocol != UDP_TEXT) {
r = read (ctx->sock, read_buf, READ_BUFSIZ - 1); r = read (ctx->sock, read_buf, READ_BUFSIZ - 1);
} }
memc_log (ctx, __LINE__, "memc_write: read reply from memcached: %s", read_buf);
/* Increment count */ /* Increment count */
ctx->count++; ctx->count++;
event_del (&ctx->mem_ev);
if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { if (strncmp (read_buf, STORED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
ctx->callback (ctx, OK, ctx->callback_data); ctx->callback (ctx, OK, ctx->callback_data);
} }
else { else {
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
} }
event_del (&ctx->mem_ev);
} }
else if (what == EV_TIMEOUT) { else if (what == EV_TIMEOUT) {
ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
event_del (&ctx->mem_ev); event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
} }
} }


size_t datalen; size_t datalen;
struct memc_udp_header header; struct memc_udp_header header;
struct iovec iov[2]; struct iovec iov[2];
int retries = 0;
int retries = 0, t;
if (what == EV_WRITE) { if (what == EV_WRITE) {
/* Send command to memcached */ /* Send command to memcached */
iov[1].iov_base = read_buf; iov[1].iov_base = read_buf;
iov[1].iov_len = READ_BUFSIZ; iov[1].iov_len = READ_BUFSIZ;
if ((r = readv (ctx->sock, iov, 2)) == -1) { if ((r = readv (ctx->sock, iov, 2)) == -1) {
event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
return;
} }
memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf); memc_log (ctx, __LINE__, "memc_read: got read_buf: %s", read_buf);
if (header.req_id != ctx->count && retries < MAX_RETRIES) { if (header.req_id != ctx->count && retries < MAX_RETRIES) {
if (r > 0) { if (r > 0) {
read_buf[r] = 0; read_buf[r] = 0;
if (ctx->param->bufpos == 0) { if (ctx->param->bufpos == 0) {
r = memc_parse_header (read_buf, &datalen, &p);
if (r < 0) {
t = memc_parse_header (read_buf, &datalen, &p);
if (t < 0) {
event_del (&ctx->mem_ev);
memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply"); memc_log (ctx, __LINE__, "memc_read: cannot parse memcached reply");
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
goto cleanup;
return;
} }
else if (r == 0) {
else if (t == 0) {
memc_log (ctx, __LINE__, "memc_read: record does not exists"); memc_log (ctx, __LINE__, "memc_read: record does not exists");
event_del (&ctx->mem_ev);
ctx->callback (ctx, NOT_EXISTS, ctx->callback_data); ctx->callback (ctx, NOT_EXISTS, ctx->callback_data);
goto cleanup;
return;
} }


if (datalen > ctx->param->bufsize) { if (datalen > ctx->param->bufsize) {
memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen); memc_log (ctx, __LINE__, "memc_read: user's buffer is too small: %zd, %zd required", ctx->param->bufsize, datalen);
event_del (&ctx->mem_ev);
ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data); ctx->callback (ctx, WRONG_LENGTH, ctx->callback_data);
goto cleanup;
return;
} }
/* Check if we already have all data in buffer */ /* Check if we already have all data in buffer */
if (r >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) { if (r >= datalen + sizeof (END_TRAILER) + sizeof (CRLF) - 2) {
memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen); memcpy (ctx->param->buf + ctx->param->bufpos, p, datalen);
/* Increment count */ /* Increment count */
ctx->count++; ctx->count++;
event_del (&ctx->mem_ev);
ctx->callback (ctx, OK, ctx->callback_data); ctx->callback (ctx, OK, ctx->callback_data);
goto cleanup;
return;
} }
/* Subtract from sum parsed header's length */ /* Subtract from sum parsed header's length */
r -= p - read_buf; r -= p - read_buf;
END_TRAILER, sizeof (END_TRAILER) - 1) == 0) { END_TRAILER, sizeof (END_TRAILER) - 1) == 0) {
r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2; r -= sizeof (END_TRAILER) - sizeof (CRLF) - 2;
memcpy (ctx->param->buf + ctx->param->bufpos, p, r); memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
event_del (&ctx->mem_ev);
ctx->callback (ctx, OK, ctx->callback_data); ctx->callback (ctx, OK, ctx->callback_data);
goto cleanup;
return;
} }
/* Store this part of data in param's buffer */ /* Store this part of data in param's buffer */
memcpy (ctx->param->buf + ctx->param->bufpos, p, r); memcpy (ctx->param->buf + ctx->param->bufpos, p, r);
} }
else { else {
memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r); memc_log (ctx, __LINE__, "memc_read: read(v) failed: %d, %m", r);
event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
goto cleanup;
return;
} }


ctx->count++; ctx->count++;
} }
else if (what == EV_TIMEOUT) { else if (what == EV_TIMEOUT) {
event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
goto cleanup;
} }
return;

cleanup:
event_del (&ctx->mem_ev);
} }


/* /*
iov[1].iov_base = read_buf; iov[1].iov_base = read_buf;
iov[1].iov_len = READ_BUFSIZ; iov[1].iov_len = READ_BUFSIZ;
if ((r = readv (ctx->sock, iov, 2)) == -1) { if ((r = readv (ctx->sock, iov, 2)) == -1) {
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
event_del (&ctx->mem_ev); event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
return;
} }
if (header.req_id != ctx->count && retries < MAX_RETRIES) { if (header.req_id != ctx->count && retries < MAX_RETRIES) {
retries ++; retries ++;
} }
/* Increment count */ /* Increment count */
ctx->count++; ctx->count++;
event_del (&ctx->mem_ev);
if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) { if (strncmp (read_buf, DELETED_TRAILER, sizeof (STORED_TRAILER) - 1) == 0) {
ctx->callback (ctx, OK, ctx->callback_data); ctx->callback (ctx, OK, ctx->callback_data);
} }
else { else {
ctx->callback (ctx, SERVER_ERROR, ctx->callback_data); ctx->callback (ctx, SERVER_ERROR, ctx->callback_data);
} }
event_del (&ctx->mem_ev);
} }
else if (what == EV_TIMEOUT) { else if (what == EV_TIMEOUT) {
ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
event_del (&ctx->mem_ev); event_del (&ctx->mem_ev);
ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
} }
} }


ctx->cmd = cmd; ctx->cmd = cmd;
ctx->op = CMD_WRITE; ctx->op = CMD_WRITE;
ctx->param = param; ctx->param = param;
param->expire = expire;
event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, &ctx->timeout); event_add (&ctx->mem_ev, &ctx->timeout);



+ 2
- 2
memcached.h View File

short options; short options;
/* Current operation */ /* Current operation */
memc_opt_t op; memc_opt_t op;
/* Event structure */
struct event mem_ev;
/* Current command */ /* Current command */
const char *cmd; const char *cmd;
/* Current param */ /* Current param */
void (*callback) (struct memcached_ctx_s *ctx, memc_error_t error, void *data); void (*callback) (struct memcached_ctx_s *ctx, memc_error_t error, void *data);
/* Data for callback function */ /* Data for callback function */
void *callback_data; void *callback_data;
/* Event structure */
struct event mem_ev;
} memcached_ctx_t; } memcached_ctx_t;


typedef void (*memcached_callback_t) (memcached_ctx_t *ctx, memc_error_t error, void *data); typedef void (*memcached_callback_t) (memcached_ctx_t *ctx, memc_error_t error, void *data);

+ 2
- 2
test/Makefile.in View File



all: rspamd_test_suite all: rspamd_test_suite


rspamd_test_suite: $(OBJECTS) ../url.o ../util.o
$(CC) $(PTHREAD_LDFLAGS) $(LDFLAGS) $(OBJECTS) ../url.o ../util.o $(LIBS) -o rspamd_test_suite
rspamd_test_suite: $(OBJECTS) ../url.o ../util.o ../memcached.o
$(CC) $(PTHREAD_LDFLAGS) $(LDFLAGS) $(OBJECTS) ../url.o ../util.o ../memcached.o $(LIBS) -o rspamd_test_suite


run_test: rspamd_test_suite run_test: rspamd_test_suite
gtester --verbose -k -o=rspamd_test.xml ./rspamd_test_suite gtester --verbose -k -o=rspamd_test.xml ./rspamd_test_suite

+ 85
- 0
test/rspamd_memcached_test.c View File

#include <sys/types.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <sys/param.h>

#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <syslog.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <event.h>

#include "../config.h"
#include "../main.h"
#include "../cfg_file.h"
#include "../memcached.h"
#include "tests.h"

u_char *buf = "test";

static void
memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
{
struct timeval tv;

switch (ctx->op) {
case CMD_CONNECT:
g_assert (error == OK);
msg_debug ("Connect ok");
memc_set (ctx, ctx->param, 60);
break;
case CMD_READ:
g_assert (error == OK);
g_assert (!strcmp(ctx->param->buf, buf));
msg_debug ("Read ok");
memc_close_ctx (ctx);
tv.tv_sec = 0;
tv.tv_usec = 0;
event_loopexit (&tv);
break;
case CMD_WRITE:
g_assert (error == OK);
msg_debug ("Write ok");
ctx->param->buf = g_malloc (sizeof (buf));
bzero (ctx->param->buf, sizeof (buf));
memc_get (ctx, ctx->param);
break;
}
}
void
rspamd_memcached_test_func ()
{
memcached_ctx_t *ctx;
memcached_param_t *param;
struct in_addr addr;

ctx = g_malloc (sizeof (memcached_ctx_t));
param = g_malloc (sizeof (memcached_param_t));
bzero (ctx, sizeof (memcached_ctx_t));
bzero (param, sizeof (memcached_param_t));

event_init ();

ctx->callback = memcached_callback;
ctx->callback_data = (void *)param;
ctx->protocol = TCP_TEXT;
inet_aton ("127.0.0.1", &addr);
memcpy (&ctx->addr, &addr, sizeof (struct in_addr));
ctx->port = htons (11211);
ctx->timeout.tv_sec = 1;
ctx->timeout.tv_usec = 0;
ctx->sock = -1;
ctx->options = MEMC_OPT_DEBUG;
strlcpy (param->key, buf, sizeof (param->key));
param->buf = buf;
param->bufsize = strlen (buf);
ctx->param = param;
g_assert (memc_init_ctx (ctx) != -1);

event_loop (0);
}


+ 1
- 2
test/rspamd_test_suite.c View File

g_test_init (&argc, &argv, NULL); g_test_init (&argc, &argv, NULL);


g_test_add_func ("/rspamd/url", rspamd_url_test_func); g_test_add_func ("/rspamd/url", rspamd_url_test_func);
g_test_add_func ("/rspamd/memcached", rspamd_memcached_test_func);


g_test_run (); g_test_run ();
g_mem_profile ();
} }

+ 3
- 0
test/tests.h View File

/* URL parser test */ /* URL parser test */
void rspamd_url_test_func (); void rspamd_url_test_func ();


/* Memceched library test */
void rspamd_memcached_test_func ();

#endif #endif

+ 1
- 1
utils/url_extracter.c View File

GMimeDataWrapper *wrapper; GMimeDataWrapper *wrapper;
GMimeStream *part_stream; GMimeStream *part_stream;
GByteArray *part_content; GByteArray *part_content;
GMimeMessage *message;
/* 'part' points to the current part node that g_mime_message_foreach_part() is iterating over */ /* 'part' points to the current part node that g_mime_message_foreach_part() is iterating over */
if (GMIME_IS_MESSAGE_PART (part)) { if (GMIME_IS_MESSAGE_PART (part)) {
/* message/rfc822 or message/news */ /* message/rfc822 or message/news */
printf ("Message part found\n"); printf ("Message part found\n");
GMimeMessage *message;
/* g_mime_message_foreach_part() won't descend into /* g_mime_message_foreach_part() won't descend into
child message parts, so if we want to count any child message parts, so if we want to count any

Loading…
Cancel
Save