]> source.dussan.org Git - rspamd.git/commitdiff
* Rewrite perl module for new XS, now perl filters got only object of rspamd module...
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 20 Aug 2008 13:09:13 +0000 (17:09 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Wed, 20 Aug 2008 13:09:13 +0000 (17:09 +0400)
* Add memcached access api for perl
* Reorganize Makefile, add .PHONY targets
* Fix memcached module as now we cannot write or read several keys in async mode

Makefile.in
main.h
memcached.c
memcached.h
perl.c
perl.h
perl/rspamd.xs
worker.c

index 65cd1a405ed1b641ec4d491acb4cc8db0944c033..df7400786a08c6fe812597c8b391e6bfc3159391 100644 (file)
@@ -1,9 +1,13 @@
+.PHONY: perl clean
 
 all: perl $(TARGETS)
 
-perl:
+perl: perl/Makefile
        cd perl && make && cd ..
 
+perl/Makefile: perl/Makefile.PL
+       cd perl && perl Makefile.PL && cd ..
+
 memctest: upstream.c memcached.c memcached-test.c
        $(CC) $(OPT_FLAGS) $(CFLAGS) $(PTHREAD_CFLAGS) -c upstream.c
        $(CC) $(OPT_FLAGS) $(CFLAGS) $(PTHREAD_CFLAGS) -c memcached.c
@@ -11,6 +15,7 @@ memctest: upstream.c memcached.c memcached-test.c
        $(CC) $(OPT_FLAGS) $(PTHREAD_LDFLAGS) $(LD_PATH) upstream.o memcached.o memcached-test.o $(LIBS) -o memcached-test
 
 install: $(EXEC)
+       cd perl && make install && cd ..
        $(INSTALL) -b $(EXEC) $(PREFIX)/sbin/$(EXEC)
        $(INSTALL) -v $(EXEC).sh $(PREFIX)/etc/rc.d
        #$(INSTALL) -m0644 rspamd.8 $(MANPATH)/man8
diff --git a/main.h b/main.h
index b2441bbe704999e7422cdcab1f37a74b5d52267c..cb98167ae7f859935cfa708d8fca671f77c7cf89 100644 (file)
--- a/main.h
+++ b/main.h
@@ -21,6 +21,7 @@
 
 #include "fstring.h"
 #include "url.h"
+#include "memcached.h"
 
 #include <glib.h>
 #include <gmime/gmime.h>
@@ -100,6 +101,9 @@ struct worker_task {
        struct in_addr from_addr;
        f_str_buf_t *msg;
        struct bufferevent *bev;
+       /* Memcached connection for this task */
+       memcached_ctx_t *memc_ctx;
+       unsigned memc_busy:1;
        /* Number of mime parts */
        int parts_count;
        /* Headers */
index 566fe659fcca45b283184cd2b096a52f89ff32f2..3228d791fc5a40623f62033fe63fc144184cbbbc 100644 (file)
@@ -395,9 +395,11 @@ socket_callback (int fd, short what, void *arg)
                                event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
                                event_add (&ctx->mem_ev, NULL);
                                ctx->callback (ctx, OK, ctx->callback_data);
+                ctx->alive = 1;
                        }
                        else {
                                ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+                ctx->alive = 0;
                        }
                        break;
                case CMD_WRITE:
@@ -538,16 +540,13 @@ memc_parse_header (char *buf, size_t *len, char **end)
  * Common read command handler for memcached
  */
 memc_error_t
-memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem)
+memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param)
 {
-       int i;
-       for (i = 0; i < *nelem; i++) {
-               ctx->cmd = cmd;
-               ctx->op = CMD_READ;
-               ctx->param = &params[i];
-               event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
-               event_add (&ctx->mem_ev, &ctx->timeout);
-       }
+       ctx->cmd = cmd;
+       ctx->op = CMD_READ;
+       ctx->param = param;
+       event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+       event_add (&ctx->mem_ev, &ctx->timeout);
 
        return OK;
 }
@@ -556,16 +555,13 @@ memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, siz
  * Common write command handler for memcached
  */
 memc_error_t
-memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire)
+memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire)
 {
-       int i;  
-       for (i = 0; i < *nelem; i++) {
-               ctx->cmd = cmd;
-               ctx->op = CMD_WRITE;
-               ctx->param = &params[i];
-               event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
-               event_add (&ctx->mem_ev, &ctx->timeout);
-       }
+       ctx->cmd = cmd;
+       ctx->op = CMD_WRITE;
+       ctx->param = param;
+       event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+       event_add (&ctx->mem_ev, &ctx->timeout);
 
        return OK;
 }
@@ -573,17 +569,13 @@ memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, si
  * Delete command handler
  */
 memc_error_t
-memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem)
+memc_delete (memcached_ctx_t *ctx, memcached_param_t *param)
 {
-       int i;
-       
-       for (i = 0; i < *nelem; i++) {
-               ctx->cmd = "delete";
-               ctx->op = CMD_DELETE;
-               ctx->param = &params[i];
-               event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
-               event_add (&ctx->mem_ev, &ctx->timeout);
-       }
+       ctx->cmd = "delete";
+       ctx->op = CMD_DELETE;
+       ctx->param = param;
+       event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+       event_add (&ctx->mem_ev, &ctx->timeout);
 
        return OK;
 }
@@ -593,13 +585,13 @@ memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem)
  * writing is done to each memcached server
  */
 memc_error_t
-memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem, int expire)
+memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire)
 {
        memc_error_t r, result = OK;
 
        while (memcached_num --) {
                if (ctx[memcached_num].alive == 1) {
-                       r = memc_write (&ctx[memcached_num], cmd, params, nelem, expire);
+                       r = memc_write (&ctx[memcached_num], cmd, param, expire);
                        if (r != OK) {
                                memc_log (&ctx[memcached_num], __LINE__, "memc_write_mirror: cannot write to mirror server: %s", memc_strerror (r));
                                result = r;
@@ -616,13 +608,13 @@ memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd,
  * reading is done from first active memcached server
  */
 memc_error_t
-memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem)
+memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param)
 {
        memc_error_t r, result = OK;
 
        while (memcached_num --) {
                if (ctx[memcached_num].alive == 1) {
-                       r = memc_read (&ctx[memcached_num], cmd, params, nelem);
+                       r = memc_read (&ctx[memcached_num], cmd, param);
                        if (r != OK) {
                                result = r;
                                if (r != NOT_EXISTS) {
@@ -647,13 +639,13 @@ memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, m
  * deleting is done for each active memcached server
  */
 memc_error_t
-memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem)
+memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param)
 {
        memc_error_t r, result = OK;
 
        while (memcached_num --) {
                if (ctx[memcached_num].alive == 1) {
-                       r = memc_delete (&ctx[memcached_num], params, nelem);
+                       r = memc_delete (&ctx[memcached_num], param);
                        if (r != OK) {
                                result = r;
                                if (r != NOT_EXISTS) {
@@ -679,7 +671,7 @@ memc_init_ctx (memcached_ctx_t *ctx)
        }
 
        ctx->count = 0;
-       ctx->alive = 1;
+       ctx->alive = 0;
        ctx->op = CMD_NULL;
        /* Set default callback */
        if (ctx->callback == NULL) {
index f3872456dda481108cbe64faf362eb9ad51b1441..c815884a3d7f007f73cc76a17a7d40e48a6b834a 100644 (file)
@@ -108,29 +108,29 @@ int memc_init_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num);
 
  * "prepend" means "add this data to an existing key before existing data".
  */
-#define memc_get(ctx, params, nelem) memc_read(ctx, "get", params, nelem)
-#define memc_set(ctx, params, nelem, expire) memc_write(ctx, "set", params, nelem, expire)
-#define memc_add(ctx, params, nelem, expire) memc_write(ctx, "add", params, nelem, expire)
-#define memc_replace(ctx, params, nelem, expire) memc_write(ctx, "replace", params, nelem, expire)
-#define memc_append(ctx, params, nelem, expire) memc_write(ctx, "append", params, nelem, expire)
-#define memc_prepend(ctx, params, nelem, expire) memc_write(ctx, "prepend", params, nelem, expire)
+#define memc_get(ctx, param) memc_read(ctx, "get", param)
+#define memc_set(ctx, param, expire) memc_write(ctx, "set", param, expire)
+#define memc_add(ctx, param, expire) memc_write(ctx, "add", param, expire)
+#define memc_replace(ctx, param, expire) memc_write(ctx, "replace", param, expire)
+#define memc_append(ctx, param, expire) memc_write(ctx, "append", param, expire)
+#define memc_prepend(ctx, param, expire) memc_write(ctx, "prepend", param, expire)
 
 /* Functions that works with mirror of memcached servers */
-#define memc_get_mirror(ctx, num, params, nelem) memc_read_mirror(ctx, num, "get", params, nelem)
-#define memc_set_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "set", params, nelem, expire)
-#define memc_add_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "add", params, nelem, expire)
-#define memc_replace_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "replace", params, nelem, expire)
-#define memc_append_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "append", params, nelem, expire)
-#define memc_prepend_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "prepend", params, nelem, expire)
+#define memc_get_mirror(ctx, num, param) memc_read_mirror(ctx, num, "get", param)
+#define memc_set_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "set", param, expire)
+#define memc_add_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "add", param, expire)
+#define memc_replace_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "replace", param, expire)
+#define memc_append_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "append", param, expire)
+#define memc_prepend_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "prepend", param, expire)
 
 
-memc_error_t memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem);
-memc_error_t memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire);
-memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem);
+memc_error_t memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param);
+memc_error_t memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire);
+memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params);
 
-memc_error_t memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem, int expire);
-memc_error_t memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem);
-memc_error_t memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem);
+memc_error_t memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire);
+memc_error_t memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param);
+memc_error_t memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param);
 
 /* Return symbolic name of memcached error*/
 const char * memc_strerror (memc_error_t err);
diff --git a/perl.c b/perl.c
index e954292a3e45249935411371da9dce3876ef1b39..e2ae086ebf609f161956a860c855c3b890742d31 100644 (file)
--- a/perl.c
+++ b/perl.c
@@ -17,7 +17,7 @@
 extern PerlInterpreter *my_perl;
 
 int
-perl_call_header_filter (const char *function, const char *header_name, const char *header_value)
+perl_call_header_filter (const char *function, struct worker_task *task)
 {
        int result;
        dSP;
@@ -26,8 +26,7 @@ perl_call_header_filter (const char *function, const char *header_name, const ch
        SAVETMPS;
 
        PUSHMARK (SP);
-       XPUSHs (sv_2mortal (newSVpv (header_name, 0)));
-       XPUSHs (sv_2mortal (newSVpv (header_value, 0)));
+       XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
        PUTBACK;
        
        call_pv (function, G_SCALAR);
@@ -35,7 +34,7 @@ perl_call_header_filter (const char *function, const char *header_name, const ch
        SPAGAIN;
 
        result = POPi;
-       msg_debug ("header_filter: call of %s with header %s returned mark %d\n", function, header_name, result);
+       msg_debug ("header_filter: call of %s with returned mark %d\n", function, result);
 
        PUTBACK;
        FREETMPS;
@@ -45,7 +44,7 @@ perl_call_header_filter (const char *function, const char *header_name, const ch
 }
 
 int
-perl_call_mime_filter (const char *function, GByteArray *content)
+perl_call_mime_filter (const char *function, struct worker_task *task)
 {
        int result;
        dSP;
@@ -54,7 +53,7 @@ perl_call_mime_filter (const char *function, GByteArray *content)
        SAVETMPS;
 
        PUSHMARK (SP);
-       XPUSHs (sv_2mortal (newSVpv (content->data, content->len)));
+       XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
        PUTBACK;
        
        call_pv (function, G_SCALAR);
@@ -72,7 +71,7 @@ perl_call_mime_filter (const char *function, GByteArray *content)
 }
 
 int
-perl_call_message_filter (const char *function, GByteArray *content)
+perl_call_message_filter (const char *function, struct worker_task *task)
 {
        int result;
        dSP;
@@ -81,7 +80,7 @@ perl_call_message_filter (const char *function, GByteArray *content)
        SAVETMPS;
 
        PUSHMARK (SP);
-       XPUSHs (sv_2mortal (newSVpv (content->data, content->len)));
+       XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
        PUTBACK;
        
        call_pv (function, G_SCALAR);
@@ -99,7 +98,7 @@ perl_call_message_filter (const char *function, GByteArray *content)
 }
 
 int
-perl_call_url_filter (const char *function, struct uri *uri)
+perl_call_url_filter (const char *function, struct worker_task *task)
 {
        int result;
        dSP;
@@ -107,15 +106,8 @@ perl_call_url_filter (const char *function, struct uri *uri)
        ENTER;
        SAVETMPS;
        
-       /* URL:
-        * url,
-        * host,
-        * data
-        */
        PUSHMARK (SP);
-       XPUSHs (sv_2mortal (newSVpv (uri->string, 0)));
-       XPUSHs (sv_2mortal (newSVpv (uri->host, uri->hostlen)));
-       XPUSHs (sv_2mortal (newSVpv (uri->data, uri->datalen)));
+       XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
        PUTBACK;
        
        call_pv (function, G_SCALAR);
@@ -123,7 +115,7 @@ perl_call_url_filter (const char *function, struct uri *uri)
        SPAGAIN;
 
        result = POPi;
-       msg_debug ("url_filter: call of %s for url '%s' returned mark %d\n", function, uri->string, result);
+       msg_debug ("url_filter: call of %s for url returned mark %d\n", function, result);
 
        PUTBACK;
        FREETMPS;
@@ -133,18 +125,16 @@ perl_call_url_filter (const char *function, struct uri *uri)
 }
 
 int
-perl_call_chain_filter (const char *function, GArray *results)
+perl_call_chain_filter (const char *function, struct worker_task *task)
 {
-       int result, i;
+       int result;
 
        dSP;
 
        ENTER;
        SAVETMPS;
        PUSHMARK (SP);
-       for (i = 0; i < results->len; i ++) {
-               XPUSHs (sv_2mortal (newSViv (g_array_index (results, int, i))));
-       }
+       XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
        PUTBACK;
        
        call_pv (function, G_SCALAR);
@@ -161,3 +151,28 @@ perl_call_chain_filter (const char *function, GArray *results)
 
        return result;
 }
+
+void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+       struct {
+        SV *callback;
+        struct worker_task *task;
+    } *callback_data = data;
+       
+       dSP;
+
+       ENTER;
+       SAVETMPS;
+       PUSHMARK (SP);
+       XPUSHs (sv_2mortal (newSViv (PTR2IV (callback_data->task))));
+       XPUSHs (sv_2mortal (newSViv (error)));
+       XPUSHs (sv_2mortal (newSVpv (ctx->param->buf, ctx->param->bufsize)));
+       PUTBACK;
+
+       call_sv (callback_data->callback, G_SCALAR);
+
+       SPAGAIN;
+       FREETMPS;
+       LEAVE;
+
+}
diff --git a/perl.h b/perl.h
index 3a659bbe16283d73a1e8e64ceabe8e2ef75e0104..fc2ae93213dbc718c6d3b258c4db3fb4d5774fc3 100644 (file)
--- a/perl.h
+++ b/perl.h
@@ -3,13 +3,17 @@
 
 #include <sys/types.h>
 #include <glib.h>
+#include "memcached.h"
 
 struct uri;
+struct worker_task;
 
-int perl_call_header_filter (const char *function, const char *header_name, const char *header_value);
-int perl_call_mime_filter (const char *function, GByteArray *content);
-int perl_call_message_filter (const char *function, GByteArray *content);
-int perl_call_url_filter (const char *function, struct uri *uri);
-int perl_call_chain_filter (const char *function, GArray *results);
+int perl_call_header_filter (const char *function, struct worker_task *task);
+int perl_call_mime_filter (const char *function, struct worker_task *task);
+int perl_call_message_filter (const char *function, struct worker_task *task);
+int perl_call_url_filter (const char *function, struct worker_task *task);
+int perl_call_chain_filter (const char *function, struct worker_task *task);
+
+void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data);
 
 #endif
index ada7a5997e88f3caaa9ddff920823a48be412b31..681e40308173aff5e1a9c403854f21ccaeceab6e 100644 (file)
@@ -6,12 +6,13 @@
 
 #include <sys/types.h>
 #include <unistd.h>
+#include <EXTERN.h>
+#include <perl.h>
+#include <XSUB.h>
+
 #include "../config.h"
 #include "../main.h"
-
-#include "EXTERN.h"
-#include "perl.h"
-#include "XSUB.h"
+#include "../perl.h"
 
 #define perl_set_session(r)                                                                                                    \
        r = INT2PTR(struct worker_task *, SvIV((SV *) SvRV(ST(0))))
@@ -98,3 +99,121 @@ get_part (r, num)
        OUTPUT:
        RETVAL
 
+void
+read_memcached_key (r, key, datalen, callback)
+    CODE:
+    struct worker_task *r;
+    char *key;
+    unsigned int datalen;
+    SV *callback;
+    STRLEN keylen;
+    struct _param {
+        SV *callback;
+        struct worker_task *task;
+    } *callback_data;
+    memcached_param_t param;
+
+    perl_set_session (r);
+    key = (char *) SvPV (ST(1), keylen);
+    datalen = (unsigned int) SvIV (ST(2));
+    callback = SvRV(ST(3));
+
+    r->memc_ctx->callback = perl_call_memcached_callback;
+    callback_data = malloc (sizeof (struct _param));
+    if (callback_data == NULL) {
+               XSRETURN_UNDEF;
+    }
+    callback_data->callback = callback;
+    callback_data->task = r;
+    r->memc_ctx->callback_data = (void *)callback_data;
+
+    r->memc_busy = 1;
+
+    strlcpy (param.key, key, sizeof (param.key));
+    param.buf = malloc (datalen);
+    if (param.buf != NULL) {
+        param.bufsize = datalen;
+    }
+    param.bufpos = 0;
+    param.expire = 0;
+
+    memc_get (r->memc_ctx, &param);
+    XSRETURN_EMPTY;
+
+void
+write_memcached_key (r, key, data, expire, callback)
+    CODE:
+    struct worker_task *r;
+    char *key, *data;
+    SV *callback;
+    STRLEN keylen, datalen;
+    int expire;
+    struct _param {
+        SV *callback;
+        struct worker_task *task;
+    } *callback_data;
+    memcached_param_t param;
+
+    perl_set_session (r);
+    key = (char *) SvPV (ST(1), keylen);
+    data = (char *) SvPV (ST(2), datalen);
+    expire = (int) SvIV (ST(3));
+    callback = SvRV(ST(4));
+
+    r->memc_ctx->callback = perl_call_memcached_callback;
+    callback_data = malloc (sizeof (struct _param));
+    if (callback_data == NULL) {
+               XSRETURN_UNDEF;
+    }
+    callback_data->callback = callback;
+    callback_data->task = r;
+    r->memc_ctx->callback_data = (void *)callback_data;
+
+    r->memc_busy = 1;
+
+    strlcpy (param.key, key, sizeof (param.key));
+    param.buf = data;
+    param.bufsize = datalen;
+    param.bufpos = 0;
+    param.expire = expire;
+
+    memc_set (r->memc_ctx, &param, expire);
+    XSRETURN_EMPTY;
+
+void
+delete_memcached_key (r, key, callback)
+    CODE:
+    struct worker_task *r;
+    char *key;
+    SV *callback;
+    STRLEN keylen;
+    struct _param {
+        SV *callback;
+        struct worker_task *task;
+    } *callback_data;
+    memcached_param_t param;
+
+    perl_set_session (r);
+    key = (char *) SvPV (ST(1), keylen);
+    callback = SvRV(ST(2));
+
+    r->memc_ctx->callback = perl_call_memcached_callback;
+    callback_data = malloc (sizeof (struct _param));
+    if (callback_data == NULL) {
+               XSRETURN_UNDEF;
+    }
+    callback_data->callback = callback;
+    callback_data->task = r;
+    r->memc_ctx->callback_data = (void *)callback_data;
+
+    r->memc_busy = 1;
+
+    strlcpy (param.key, key, sizeof (param.key));
+    param.buf = NULL;
+    param.bufsize = 0;
+    param.bufpos = 0;
+    param.expire = 0;
+
+    memc_delete (r->memc_ctx, &param);
+    XSRETURN_EMPTY;
+
index 0b3dd30bc24e39800c2fe0338fa8f24761e846b8..d452843928c54ebcf51f5b00e021aaf22c2a0a2e 100644 (file)
--- a/worker.c
+++ b/worker.c
@@ -92,6 +92,10 @@ free_task (struct worker_task *task)
                if (task->rcpt) {
                        free (task->rcpt);
                }
+               if (task->memc_ctx) {
+                       memc_close_ctx (task->memc_ctx);
+                       free (task->memc_ctx);
+               }
                while (!TAILQ_EMPTY (&task->urls)) {
                        cur = TAILQ_FIRST (&task->urls);
                        TAILQ_REMOVE (&task->urls, cur, next);
@@ -394,7 +398,7 @@ accept_socket (int fd, short what, void *arg)
        
        new_task = malloc (sizeof (struct worker_task));
        if (new_task == NULL) {
-               msg_err ("accept_socket: cannot allocate memory for task");
+               msg_err ("accept_socket: cannot allocate memory for task, %m");
                return;
        }
        new_task->worker = worker;
@@ -404,6 +408,15 @@ accept_socket (int fd, short what, void *arg)
        TAILQ_INIT (&new_task->urls);
        TAILQ_INIT (&new_task->results);
        TAILQ_INIT (&new_task->parts);
+       new_task->memc_ctx = malloc (sizeof (memcached_ctx_t));
+       if (new_task->memc_ctx == NULL) {
+               msg_err ("accept_socket: cannot allocate memory for memcached ctx, %m");
+       }
+       else {
+               if (memc_init_ctx (new_task->memc_ctx) == -1) {
+                       msg_err ("accept_socket: cannot init memcached context for task");
+               }
+       }
 
        /* Read event */
        new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);