aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2008-08-20 17:09:13 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2008-08-20 17:09:13 +0400
commite90352d20a0d5f615c906b7719f95599cb2aaeac (patch)
tree61c2a5a7efc21fb03b4ba9a9779af8e76e641702
parent86fd197a95f72d09e5cccbc059829ff5f6b03b87 (diff)
downloadrspamd-e90352d20a0d5f615c906b7719f95599cb2aaeac.tar.gz
rspamd-e90352d20a0d5f615c906b7719f95599cb2aaeac.zip
* Rewrite perl module for new XS, now perl filters got only object of rspamd module that provides access to every part of message
* Add memcached access api for perl * Reorganize Makefile, add .PHONY targets * Fix memcached module as now we cannot write or read several keys in async mode
-rw-r--r--Makefile.in7
-rw-r--r--main.h4
-rw-r--r--memcached.c62
-rw-r--r--memcached.h36
-rw-r--r--perl.c61
-rw-r--r--perl.h14
-rw-r--r--perl/rspamd.xs127
-rw-r--r--worker.c15
8 files changed, 239 insertions, 87 deletions
diff --git a/Makefile.in b/Makefile.in
index 65cd1a405..df7400786 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -1,9 +1,13 @@
+.PHONY: perl clean
all: perl $(TARGETS)
-perl:
+perl: perl/Makefile
cd perl && make && cd ..
+perl/Makefile: perl/Makefile.PL
+ cd perl && perl Makefile.PL && cd ..
+
memctest: upstream.c memcached.c memcached-test.c
$(CC) $(OPT_FLAGS) $(CFLAGS) $(PTHREAD_CFLAGS) -c upstream.c
$(CC) $(OPT_FLAGS) $(CFLAGS) $(PTHREAD_CFLAGS) -c memcached.c
@@ -11,6 +15,7 @@ memctest: upstream.c memcached.c memcached-test.c
$(CC) $(OPT_FLAGS) $(PTHREAD_LDFLAGS) $(LD_PATH) upstream.o memcached.o memcached-test.o $(LIBS) -o memcached-test
install: $(EXEC)
+ cd perl && make install && cd ..
$(INSTALL) -b $(EXEC) $(PREFIX)/sbin/$(EXEC)
$(INSTALL) -v $(EXEC).sh $(PREFIX)/etc/rc.d
#$(INSTALL) -m0644 rspamd.8 $(MANPATH)/man8
diff --git a/main.h b/main.h
index b2441bbe7..cb98167ae 100644
--- a/main.h
+++ b/main.h
@@ -21,6 +21,7 @@
#include "fstring.h"
#include "url.h"
+#include "memcached.h"
#include <glib.h>
#include <gmime/gmime.h>
@@ -100,6 +101,9 @@ struct worker_task {
struct in_addr from_addr;
f_str_buf_t *msg;
struct bufferevent *bev;
+ /* Memcached connection for this task */
+ memcached_ctx_t *memc_ctx;
+ unsigned memc_busy:1;
/* Number of mime parts */
int parts_count;
/* Headers */
diff --git a/memcached.c b/memcached.c
index 566fe659f..3228d791f 100644
--- a/memcached.c
+++ b/memcached.c
@@ -395,9 +395,11 @@ socket_callback (int fd, short what, void *arg)
event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, NULL);
ctx->callback (ctx, OK, ctx->callback_data);
+ ctx->alive = 1;
}
else {
ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ ctx->alive = 0;
}
break;
case CMD_WRITE:
@@ -538,16 +540,13 @@ memc_parse_header (char *buf, size_t *len, char **end)
* Common read command handler for memcached
*/
memc_error_t
-memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem)
+memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param)
{
- int i;
- for (i = 0; i < *nelem; i++) {
- ctx->cmd = cmd;
- ctx->op = CMD_READ;
- ctx->param = &params[i];
- event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
- event_add (&ctx->mem_ev, &ctx->timeout);
- }
+ ctx->cmd = cmd;
+ ctx->op = CMD_READ;
+ ctx->param = param;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
return OK;
}
@@ -556,16 +555,13 @@ memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, siz
* Common write command handler for memcached
*/
memc_error_t
-memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire)
+memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire)
{
- int i;
- for (i = 0; i < *nelem; i++) {
- ctx->cmd = cmd;
- ctx->op = CMD_WRITE;
- ctx->param = &params[i];
- event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
- event_add (&ctx->mem_ev, &ctx->timeout);
- }
+ ctx->cmd = cmd;
+ ctx->op = CMD_WRITE;
+ ctx->param = param;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
return OK;
}
@@ -573,17 +569,13 @@ memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, si
* Delete command handler
*/
memc_error_t
-memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem)
+memc_delete (memcached_ctx_t *ctx, memcached_param_t *param)
{
- int i;
-
- for (i = 0; i < *nelem; i++) {
- ctx->cmd = "delete";
- ctx->op = CMD_DELETE;
- ctx->param = &params[i];
- event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
- event_add (&ctx->mem_ev, &ctx->timeout);
- }
+ ctx->cmd = "delete";
+ ctx->op = CMD_DELETE;
+ ctx->param = param;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
return OK;
}
@@ -593,13 +585,13 @@ memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem)
* writing is done to each memcached server
*/
memc_error_t
-memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem, int expire)
+memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire)
{
memc_error_t r, result = OK;
while (memcached_num --) {
if (ctx[memcached_num].alive == 1) {
- r = memc_write (&ctx[memcached_num], cmd, params, nelem, expire);
+ r = memc_write (&ctx[memcached_num], cmd, param, expire);
if (r != OK) {
memc_log (&ctx[memcached_num], __LINE__, "memc_write_mirror: cannot write to mirror server: %s", memc_strerror (r));
result = r;
@@ -616,13 +608,13 @@ memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd,
* reading is done from first active memcached server
*/
memc_error_t
-memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem)
+memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param)
{
memc_error_t r, result = OK;
while (memcached_num --) {
if (ctx[memcached_num].alive == 1) {
- r = memc_read (&ctx[memcached_num], cmd, params, nelem);
+ r = memc_read (&ctx[memcached_num], cmd, param);
if (r != OK) {
result = r;
if (r != NOT_EXISTS) {
@@ -647,13 +639,13 @@ memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, m
* deleting is done for each active memcached server
*/
memc_error_t
-memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem)
+memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param)
{
memc_error_t r, result = OK;
while (memcached_num --) {
if (ctx[memcached_num].alive == 1) {
- r = memc_delete (&ctx[memcached_num], params, nelem);
+ r = memc_delete (&ctx[memcached_num], param);
if (r != OK) {
result = r;
if (r != NOT_EXISTS) {
@@ -679,7 +671,7 @@ memc_init_ctx (memcached_ctx_t *ctx)
}
ctx->count = 0;
- ctx->alive = 1;
+ ctx->alive = 0;
ctx->op = CMD_NULL;
/* Set default callback */
if (ctx->callback == NULL) {
diff --git a/memcached.h b/memcached.h
index f3872456d..c815884a3 100644
--- a/memcached.h
+++ b/memcached.h
@@ -108,29 +108,29 @@ int memc_init_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num);
* "prepend" means "add this data to an existing key before existing data".
*/
-#define memc_get(ctx, params, nelem) memc_read(ctx, "get", params, nelem)
-#define memc_set(ctx, params, nelem, expire) memc_write(ctx, "set", params, nelem, expire)
-#define memc_add(ctx, params, nelem, expire) memc_write(ctx, "add", params, nelem, expire)
-#define memc_replace(ctx, params, nelem, expire) memc_write(ctx, "replace", params, nelem, expire)
-#define memc_append(ctx, params, nelem, expire) memc_write(ctx, "append", params, nelem, expire)
-#define memc_prepend(ctx, params, nelem, expire) memc_write(ctx, "prepend", params, nelem, expire)
+#define memc_get(ctx, param) memc_read(ctx, "get", param)
+#define memc_set(ctx, param, expire) memc_write(ctx, "set", param, expire)
+#define memc_add(ctx, param, expire) memc_write(ctx, "add", param, expire)
+#define memc_replace(ctx, param, expire) memc_write(ctx, "replace", param, expire)
+#define memc_append(ctx, param, expire) memc_write(ctx, "append", param, expire)
+#define memc_prepend(ctx, param, expire) memc_write(ctx, "prepend", param, expire)
/* Functions that works with mirror of memcached servers */
-#define memc_get_mirror(ctx, num, params, nelem) memc_read_mirror(ctx, num, "get", params, nelem)
-#define memc_set_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "set", params, nelem, expire)
-#define memc_add_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "add", params, nelem, expire)
-#define memc_replace_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "replace", params, nelem, expire)
-#define memc_append_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "append", params, nelem, expire)
-#define memc_prepend_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "prepend", params, nelem, expire)
+#define memc_get_mirror(ctx, num, param) memc_read_mirror(ctx, num, "get", param)
+#define memc_set_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "set", param, expire)
+#define memc_add_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "add", param, expire)
+#define memc_replace_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "replace", param, expire)
+#define memc_append_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "append", param, expire)
+#define memc_prepend_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "prepend", param, expire)
-memc_error_t memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem);
-memc_error_t memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire);
-memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem);
+memc_error_t memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param);
+memc_error_t memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire);
+memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params);
-memc_error_t memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem, int expire);
-memc_error_t memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem);
-memc_error_t memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem);
+memc_error_t memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire);
+memc_error_t memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param);
+memc_error_t memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param);
/* Return symbolic name of memcached error*/
const char * memc_strerror (memc_error_t err);
diff --git a/perl.c b/perl.c
index e954292a3..e2ae086eb 100644
--- a/perl.c
+++ b/perl.c
@@ -17,7 +17,7 @@
extern PerlInterpreter *my_perl;
int
-perl_call_header_filter (const char *function, const char *header_name, const char *header_value)
+perl_call_header_filter (const char *function, struct worker_task *task)
{
int result;
dSP;
@@ -26,8 +26,7 @@ perl_call_header_filter (const char *function, const char *header_name, const ch
SAVETMPS;
PUSHMARK (SP);
- XPUSHs (sv_2mortal (newSVpv (header_name, 0)));
- XPUSHs (sv_2mortal (newSVpv (header_value, 0)));
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
PUTBACK;
call_pv (function, G_SCALAR);
@@ -35,7 +34,7 @@ perl_call_header_filter (const char *function, const char *header_name, const ch
SPAGAIN;
result = POPi;
- msg_debug ("header_filter: call of %s with header %s returned mark %d\n", function, header_name, result);
+ msg_debug ("header_filter: call of %s with returned mark %d\n", function, result);
PUTBACK;
FREETMPS;
@@ -45,7 +44,7 @@ perl_call_header_filter (const char *function, const char *header_name, const ch
}
int
-perl_call_mime_filter (const char *function, GByteArray *content)
+perl_call_mime_filter (const char *function, struct worker_task *task)
{
int result;
dSP;
@@ -54,7 +53,7 @@ perl_call_mime_filter (const char *function, GByteArray *content)
SAVETMPS;
PUSHMARK (SP);
- XPUSHs (sv_2mortal (newSVpv (content->data, content->len)));
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
PUTBACK;
call_pv (function, G_SCALAR);
@@ -72,7 +71,7 @@ perl_call_mime_filter (const char *function, GByteArray *content)
}
int
-perl_call_message_filter (const char *function, GByteArray *content)
+perl_call_message_filter (const char *function, struct worker_task *task)
{
int result;
dSP;
@@ -81,7 +80,7 @@ perl_call_message_filter (const char *function, GByteArray *content)
SAVETMPS;
PUSHMARK (SP);
- XPUSHs (sv_2mortal (newSVpv (content->data, content->len)));
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
PUTBACK;
call_pv (function, G_SCALAR);
@@ -99,7 +98,7 @@ perl_call_message_filter (const char *function, GByteArray *content)
}
int
-perl_call_url_filter (const char *function, struct uri *uri)
+perl_call_url_filter (const char *function, struct worker_task *task)
{
int result;
dSP;
@@ -107,15 +106,8 @@ perl_call_url_filter (const char *function, struct uri *uri)
ENTER;
SAVETMPS;
- /* URL:
- * url,
- * host,
- * data
- */
PUSHMARK (SP);
- XPUSHs (sv_2mortal (newSVpv (uri->string, 0)));
- XPUSHs (sv_2mortal (newSVpv (uri->host, uri->hostlen)));
- XPUSHs (sv_2mortal (newSVpv (uri->data, uri->datalen)));
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
PUTBACK;
call_pv (function, G_SCALAR);
@@ -123,7 +115,7 @@ perl_call_url_filter (const char *function, struct uri *uri)
SPAGAIN;
result = POPi;
- msg_debug ("url_filter: call of %s for url '%s' returned mark %d\n", function, uri->string, result);
+ msg_debug ("url_filter: call of %s for url returned mark %d\n", function, result);
PUTBACK;
FREETMPS;
@@ -133,18 +125,16 @@ perl_call_url_filter (const char *function, struct uri *uri)
}
int
-perl_call_chain_filter (const char *function, GArray *results)
+perl_call_chain_filter (const char *function, struct worker_task *task)
{
- int result, i;
+ int result;
dSP;
ENTER;
SAVETMPS;
PUSHMARK (SP);
- for (i = 0; i < results->len; i ++) {
- XPUSHs (sv_2mortal (newSViv (g_array_index (results, int, i))));
- }
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
PUTBACK;
call_pv (function, G_SCALAR);
@@ -161,3 +151,28 @@ perl_call_chain_filter (const char *function, GArray *results)
return result;
}
+
+void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+ struct {
+ SV *callback;
+ struct worker_task *task;
+ } *callback_data = data;
+
+ dSP;
+
+ ENTER;
+ SAVETMPS;
+ PUSHMARK (SP);
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (callback_data->task))));
+ XPUSHs (sv_2mortal (newSViv (error)));
+ XPUSHs (sv_2mortal (newSVpv (ctx->param->buf, ctx->param->bufsize)));
+ PUTBACK;
+
+ call_sv (callback_data->callback, G_SCALAR);
+
+ SPAGAIN;
+ FREETMPS;
+ LEAVE;
+
+}
diff --git a/perl.h b/perl.h
index 3a659bbe1..fc2ae9321 100644
--- a/perl.h
+++ b/perl.h
@@ -3,13 +3,17 @@
#include <sys/types.h>
#include <glib.h>
+#include "memcached.h"
struct uri;
+struct worker_task;
-int perl_call_header_filter (const char *function, const char *header_name, const char *header_value);
-int perl_call_mime_filter (const char *function, GByteArray *content);
-int perl_call_message_filter (const char *function, GByteArray *content);
-int perl_call_url_filter (const char *function, struct uri *uri);
-int perl_call_chain_filter (const char *function, GArray *results);
+int perl_call_header_filter (const char *function, struct worker_task *task);
+int perl_call_mime_filter (const char *function, struct worker_task *task);
+int perl_call_message_filter (const char *function, struct worker_task *task);
+int perl_call_url_filter (const char *function, struct worker_task *task);
+int perl_call_chain_filter (const char *function, struct worker_task *task);
+
+void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data);
#endif
diff --git a/perl/rspamd.xs b/perl/rspamd.xs
index ada7a5997..681e40308 100644
--- a/perl/rspamd.xs
+++ b/perl/rspamd.xs
@@ -6,12 +6,13 @@
#include <sys/types.h>
#include <unistd.h>
+#include <EXTERN.h>
+#include <perl.h>
+#include <XSUB.h>
+
#include "../config.h"
#include "../main.h"
-
-#include "EXTERN.h"
-#include "perl.h"
-#include "XSUB.h"
+#include "../perl.h"
#define perl_set_session(r) \
r = INT2PTR(struct worker_task *, SvIV((SV *) SvRV(ST(0))))
@@ -98,3 +99,121 @@ get_part (r, num)
OUTPUT:
RETVAL
+void
+read_memcached_key (r, key, datalen, callback)
+ CODE:
+ struct worker_task *r;
+ char *key;
+ unsigned int datalen;
+ SV *callback;
+ STRLEN keylen;
+ struct _param {
+ SV *callback;
+ struct worker_task *task;
+ } *callback_data;
+ memcached_param_t param;
+
+ perl_set_session (r);
+ key = (char *) SvPV (ST(1), keylen);
+ datalen = (unsigned int) SvIV (ST(2));
+ callback = SvRV(ST(3));
+
+ r->memc_ctx->callback = perl_call_memcached_callback;
+ callback_data = malloc (sizeof (struct _param));
+ if (callback_data == NULL) {
+ XSRETURN_UNDEF;
+ }
+ callback_data->callback = callback;
+ callback_data->task = r;
+ r->memc_ctx->callback_data = (void *)callback_data;
+
+ r->memc_busy = 1;
+
+ strlcpy (param.key, key, sizeof (param.key));
+ param.buf = malloc (datalen);
+ if (param.buf != NULL) {
+ param.bufsize = datalen;
+ }
+ param.bufpos = 0;
+ param.expire = 0;
+
+ memc_get (r->memc_ctx, &param);
+ XSRETURN_EMPTY;
+
+void
+write_memcached_key (r, key, data, expire, callback)
+ CODE:
+ struct worker_task *r;
+ char *key, *data;
+ SV *callback;
+ STRLEN keylen, datalen;
+ int expire;
+ struct _param {
+ SV *callback;
+ struct worker_task *task;
+ } *callback_data;
+ memcached_param_t param;
+
+ perl_set_session (r);
+ key = (char *) SvPV (ST(1), keylen);
+ data = (char *) SvPV (ST(2), datalen);
+ expire = (int) SvIV (ST(3));
+ callback = SvRV(ST(4));
+
+ r->memc_ctx->callback = perl_call_memcached_callback;
+ callback_data = malloc (sizeof (struct _param));
+ if (callback_data == NULL) {
+ XSRETURN_UNDEF;
+ }
+ callback_data->callback = callback;
+ callback_data->task = r;
+ r->memc_ctx->callback_data = (void *)callback_data;
+
+ r->memc_busy = 1;
+
+ strlcpy (param.key, key, sizeof (param.key));
+ param.buf = data;
+ param.bufsize = datalen;
+ param.bufpos = 0;
+ param.expire = expire;
+
+ memc_set (r->memc_ctx, &param, expire);
+ XSRETURN_EMPTY;
+
+void
+delete_memcached_key (r, key, callback)
+ CODE:
+ struct worker_task *r;
+ char *key;
+ SV *callback;
+ STRLEN keylen;
+ struct _param {
+ SV *callback;
+ struct worker_task *task;
+ } *callback_data;
+ memcached_param_t param;
+
+ perl_set_session (r);
+ key = (char *) SvPV (ST(1), keylen);
+ callback = SvRV(ST(2));
+
+ r->memc_ctx->callback = perl_call_memcached_callback;
+ callback_data = malloc (sizeof (struct _param));
+ if (callback_data == NULL) {
+ XSRETURN_UNDEF;
+ }
+ callback_data->callback = callback;
+ callback_data->task = r;
+ r->memc_ctx->callback_data = (void *)callback_data;
+
+ r->memc_busy = 1;
+
+ strlcpy (param.key, key, sizeof (param.key));
+ param.buf = NULL;
+ param.bufsize = 0;
+ param.bufpos = 0;
+ param.expire = 0;
+
+ memc_delete (r->memc_ctx, &param);
+ XSRETURN_EMPTY;
+
diff --git a/worker.c b/worker.c
index 0b3dd30bc..d45284392 100644
--- a/worker.c
+++ b/worker.c
@@ -92,6 +92,10 @@ free_task (struct worker_task *task)
if (task->rcpt) {
free (task->rcpt);
}
+ if (task->memc_ctx) {
+ memc_close_ctx (task->memc_ctx);
+ free (task->memc_ctx);
+ }
while (!TAILQ_EMPTY (&task->urls)) {
cur = TAILQ_FIRST (&task->urls);
TAILQ_REMOVE (&task->urls, cur, next);
@@ -394,7 +398,7 @@ accept_socket (int fd, short what, void *arg)
new_task = malloc (sizeof (struct worker_task));
if (new_task == NULL) {
- msg_err ("accept_socket: cannot allocate memory for task");
+ msg_err ("accept_socket: cannot allocate memory for task, %m");
return;
}
new_task->worker = worker;
@@ -404,6 +408,15 @@ accept_socket (int fd, short what, void *arg)
TAILQ_INIT (&new_task->urls);
TAILQ_INIT (&new_task->results);
TAILQ_INIT (&new_task->parts);
+ new_task->memc_ctx = malloc (sizeof (memcached_ctx_t));
+ if (new_task->memc_ctx == NULL) {
+ msg_err ("accept_socket: cannot allocate memory for memcached ctx, %m");
+ }
+ else {
+ if (memc_init_ctx (new_task->memc_ctx) == -1) {
+ msg_err ("accept_socket: cannot init memcached context for task");
+ }
+ }
/* Read event */
new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);