From: Vsevolod Stakhov Date: Wed, 20 Aug 2008 13:09:13 +0000 (+0400) Subject: * Rewrite perl module for new XS, now perl filters got only object of rspamd module... X-Git-Tag: 0.2.7~391 X-Git-Url: https://source.dussan.org/?a=commitdiff_plain;h=e90352d20a0d5f615c906b7719f95599cb2aaeac;p=rspamd.git * Rewrite perl module for new XS, now perl filters got only object of rspamd module that provides access to every part of message * Add memcached access api for perl * Reorganize Makefile, add .PHONY targets * Fix memcached module as now we cannot write or read several keys in async mode --- diff --git a/Makefile.in b/Makefile.in index 65cd1a405..df7400786 100644 --- a/Makefile.in +++ b/Makefile.in @@ -1,9 +1,13 @@ +.PHONY: perl clean all: perl $(TARGETS) -perl: +perl: perl/Makefile cd perl && make && cd .. +perl/Makefile: perl/Makefile.PL + cd perl && perl Makefile.PL && cd .. + memctest: upstream.c memcached.c memcached-test.c $(CC) $(OPT_FLAGS) $(CFLAGS) $(PTHREAD_CFLAGS) -c upstream.c $(CC) $(OPT_FLAGS) $(CFLAGS) $(PTHREAD_CFLAGS) -c memcached.c @@ -11,6 +15,7 @@ memctest: upstream.c memcached.c memcached-test.c $(CC) $(OPT_FLAGS) $(PTHREAD_LDFLAGS) $(LD_PATH) upstream.o memcached.o memcached-test.o $(LIBS) -o memcached-test install: $(EXEC) + cd perl && make install && cd .. $(INSTALL) -b $(EXEC) $(PREFIX)/sbin/$(EXEC) $(INSTALL) -v $(EXEC).sh $(PREFIX)/etc/rc.d #$(INSTALL) -m0644 rspamd.8 $(MANPATH)/man8 diff --git a/main.h b/main.h index b2441bbe7..cb98167ae 100644 --- a/main.h +++ b/main.h @@ -21,6 +21,7 @@ #include "fstring.h" #include "url.h" +#include "memcached.h" #include #include @@ -100,6 +101,9 @@ struct worker_task { struct in_addr from_addr; f_str_buf_t *msg; struct bufferevent *bev; + /* Memcached connection for this task */ + memcached_ctx_t *memc_ctx; + unsigned memc_busy:1; /* Number of mime parts */ int parts_count; /* Headers */ diff --git a/memcached.c b/memcached.c index 566fe659f..3228d791f 100644 --- a/memcached.c +++ b/memcached.c @@ -395,9 +395,11 @@ socket_callback (int fd, short what, void *arg) event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx); event_add (&ctx->mem_ev, NULL); ctx->callback (ctx, OK, ctx->callback_data); + ctx->alive = 1; } else { ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data); + ctx->alive = 0; } break; case CMD_WRITE: @@ -538,16 +540,13 @@ memc_parse_header (char *buf, size_t *len, char **end) * Common read command handler for memcached */ memc_error_t -memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem) +memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param) { - int i; - for (i = 0; i < *nelem; i++) { - ctx->cmd = cmd; - ctx->op = CMD_READ; - ctx->param = ¶ms[i]; - event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); - event_add (&ctx->mem_ev, &ctx->timeout); - } + ctx->cmd = cmd; + ctx->op = CMD_READ; + ctx->param = param; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); return OK; } @@ -556,16 +555,13 @@ memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, siz * Common write command handler for memcached */ memc_error_t -memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire) +memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire) { - int i; - for (i = 0; i < *nelem; i++) { - ctx->cmd = cmd; - ctx->op = CMD_WRITE; - ctx->param = ¶ms[i]; - event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); - event_add (&ctx->mem_ev, &ctx->timeout); - } + ctx->cmd = cmd; + ctx->op = CMD_WRITE; + ctx->param = param; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); return OK; } @@ -573,17 +569,13 @@ memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, si * Delete command handler */ memc_error_t -memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem) +memc_delete (memcached_ctx_t *ctx, memcached_param_t *param) { - int i; - - for (i = 0; i < *nelem; i++) { - ctx->cmd = "delete"; - ctx->op = CMD_DELETE; - ctx->param = ¶ms[i]; - event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); - event_add (&ctx->mem_ev, &ctx->timeout); - } + ctx->cmd = "delete"; + ctx->op = CMD_DELETE; + ctx->param = param; + event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx); + event_add (&ctx->mem_ev, &ctx->timeout); return OK; } @@ -593,13 +585,13 @@ memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem) * writing is done to each memcached server */ memc_error_t -memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem, int expire) +memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire) { memc_error_t r, result = OK; while (memcached_num --) { if (ctx[memcached_num].alive == 1) { - r = memc_write (&ctx[memcached_num], cmd, params, nelem, expire); + r = memc_write (&ctx[memcached_num], cmd, param, expire); if (r != OK) { memc_log (&ctx[memcached_num], __LINE__, "memc_write_mirror: cannot write to mirror server: %s", memc_strerror (r)); result = r; @@ -616,13 +608,13 @@ memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, * reading is done from first active memcached server */ memc_error_t -memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem) +memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param) { memc_error_t r, result = OK; while (memcached_num --) { if (ctx[memcached_num].alive == 1) { - r = memc_read (&ctx[memcached_num], cmd, params, nelem); + r = memc_read (&ctx[memcached_num], cmd, param); if (r != OK) { result = r; if (r != NOT_EXISTS) { @@ -647,13 +639,13 @@ memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, m * deleting is done for each active memcached server */ memc_error_t -memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem) +memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param) { memc_error_t r, result = OK; while (memcached_num --) { if (ctx[memcached_num].alive == 1) { - r = memc_delete (&ctx[memcached_num], params, nelem); + r = memc_delete (&ctx[memcached_num], param); if (r != OK) { result = r; if (r != NOT_EXISTS) { @@ -679,7 +671,7 @@ memc_init_ctx (memcached_ctx_t *ctx) } ctx->count = 0; - ctx->alive = 1; + ctx->alive = 0; ctx->op = CMD_NULL; /* Set default callback */ if (ctx->callback == NULL) { diff --git a/memcached.h b/memcached.h index f3872456d..c815884a3 100644 --- a/memcached.h +++ b/memcached.h @@ -108,29 +108,29 @@ int memc_init_ctx_mirror (memcached_ctx_t *ctx, size_t memcached_num); * "prepend" means "add this data to an existing key before existing data". */ -#define memc_get(ctx, params, nelem) memc_read(ctx, "get", params, nelem) -#define memc_set(ctx, params, nelem, expire) memc_write(ctx, "set", params, nelem, expire) -#define memc_add(ctx, params, nelem, expire) memc_write(ctx, "add", params, nelem, expire) -#define memc_replace(ctx, params, nelem, expire) memc_write(ctx, "replace", params, nelem, expire) -#define memc_append(ctx, params, nelem, expire) memc_write(ctx, "append", params, nelem, expire) -#define memc_prepend(ctx, params, nelem, expire) memc_write(ctx, "prepend", params, nelem, expire) +#define memc_get(ctx, param) memc_read(ctx, "get", param) +#define memc_set(ctx, param, expire) memc_write(ctx, "set", param, expire) +#define memc_add(ctx, param, expire) memc_write(ctx, "add", param, expire) +#define memc_replace(ctx, param, expire) memc_write(ctx, "replace", param, expire) +#define memc_append(ctx, param, expire) memc_write(ctx, "append", param, expire) +#define memc_prepend(ctx, param, expire) memc_write(ctx, "prepend", param, expire) /* Functions that works with mirror of memcached servers */ -#define memc_get_mirror(ctx, num, params, nelem) memc_read_mirror(ctx, num, "get", params, nelem) -#define memc_set_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "set", params, nelem, expire) -#define memc_add_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "add", params, nelem, expire) -#define memc_replace_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "replace", params, nelem, expire) -#define memc_append_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "append", params, nelem, expire) -#define memc_prepend_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "prepend", params, nelem, expire) +#define memc_get_mirror(ctx, num, param) memc_read_mirror(ctx, num, "get", param) +#define memc_set_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "set", param, expire) +#define memc_add_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "add", param, expire) +#define memc_replace_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "replace", param, expire) +#define memc_append_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "append", param, expire) +#define memc_prepend_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "prepend", param, expire) -memc_error_t memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem); -memc_error_t memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire); -memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem); +memc_error_t memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param); +memc_error_t memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire); +memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params); -memc_error_t memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem, int expire); -memc_error_t memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem); -memc_error_t memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem); +memc_error_t memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire); +memc_error_t memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param); +memc_error_t memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param); /* Return symbolic name of memcached error*/ const char * memc_strerror (memc_error_t err); diff --git a/perl.c b/perl.c index e954292a3..e2ae086eb 100644 --- a/perl.c +++ b/perl.c @@ -17,7 +17,7 @@ extern PerlInterpreter *my_perl; int -perl_call_header_filter (const char *function, const char *header_name, const char *header_value) +perl_call_header_filter (const char *function, struct worker_task *task) { int result; dSP; @@ -26,8 +26,7 @@ perl_call_header_filter (const char *function, const char *header_name, const ch SAVETMPS; PUSHMARK (SP); - XPUSHs (sv_2mortal (newSVpv (header_name, 0))); - XPUSHs (sv_2mortal (newSVpv (header_value, 0))); + XPUSHs (sv_2mortal (newSViv (PTR2IV (task)))); PUTBACK; call_pv (function, G_SCALAR); @@ -35,7 +34,7 @@ perl_call_header_filter (const char *function, const char *header_name, const ch SPAGAIN; result = POPi; - msg_debug ("header_filter: call of %s with header %s returned mark %d\n", function, header_name, result); + msg_debug ("header_filter: call of %s with returned mark %d\n", function, result); PUTBACK; FREETMPS; @@ -45,7 +44,7 @@ perl_call_header_filter (const char *function, const char *header_name, const ch } int -perl_call_mime_filter (const char *function, GByteArray *content) +perl_call_mime_filter (const char *function, struct worker_task *task) { int result; dSP; @@ -54,7 +53,7 @@ perl_call_mime_filter (const char *function, GByteArray *content) SAVETMPS; PUSHMARK (SP); - XPUSHs (sv_2mortal (newSVpv (content->data, content->len))); + XPUSHs (sv_2mortal (newSViv (PTR2IV (task)))); PUTBACK; call_pv (function, G_SCALAR); @@ -72,7 +71,7 @@ perl_call_mime_filter (const char *function, GByteArray *content) } int -perl_call_message_filter (const char *function, GByteArray *content) +perl_call_message_filter (const char *function, struct worker_task *task) { int result; dSP; @@ -81,7 +80,7 @@ perl_call_message_filter (const char *function, GByteArray *content) SAVETMPS; PUSHMARK (SP); - XPUSHs (sv_2mortal (newSVpv (content->data, content->len))); + XPUSHs (sv_2mortal (newSViv (PTR2IV (task)))); PUTBACK; call_pv (function, G_SCALAR); @@ -99,7 +98,7 @@ perl_call_message_filter (const char *function, GByteArray *content) } int -perl_call_url_filter (const char *function, struct uri *uri) +perl_call_url_filter (const char *function, struct worker_task *task) { int result; dSP; @@ -107,15 +106,8 @@ perl_call_url_filter (const char *function, struct uri *uri) ENTER; SAVETMPS; - /* URL: - * url, - * host, - * data - */ PUSHMARK (SP); - XPUSHs (sv_2mortal (newSVpv (uri->string, 0))); - XPUSHs (sv_2mortal (newSVpv (uri->host, uri->hostlen))); - XPUSHs (sv_2mortal (newSVpv (uri->data, uri->datalen))); + XPUSHs (sv_2mortal (newSViv (PTR2IV (task)))); PUTBACK; call_pv (function, G_SCALAR); @@ -123,7 +115,7 @@ perl_call_url_filter (const char *function, struct uri *uri) SPAGAIN; result = POPi; - msg_debug ("url_filter: call of %s for url '%s' returned mark %d\n", function, uri->string, result); + msg_debug ("url_filter: call of %s for url returned mark %d\n", function, result); PUTBACK; FREETMPS; @@ -133,18 +125,16 @@ perl_call_url_filter (const char *function, struct uri *uri) } int -perl_call_chain_filter (const char *function, GArray *results) +perl_call_chain_filter (const char *function, struct worker_task *task) { - int result, i; + int result; dSP; ENTER; SAVETMPS; PUSHMARK (SP); - for (i = 0; i < results->len; i ++) { - XPUSHs (sv_2mortal (newSViv (g_array_index (results, int, i)))); - } + XPUSHs (sv_2mortal (newSViv (PTR2IV (task)))); PUTBACK; call_pv (function, G_SCALAR); @@ -161,3 +151,28 @@ perl_call_chain_filter (const char *function, GArray *results) return result; } + +void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data) +{ + struct { + SV *callback; + struct worker_task *task; + } *callback_data = data; + + dSP; + + ENTER; + SAVETMPS; + PUSHMARK (SP); + XPUSHs (sv_2mortal (newSViv (PTR2IV (callback_data->task)))); + XPUSHs (sv_2mortal (newSViv (error))); + XPUSHs (sv_2mortal (newSVpv (ctx->param->buf, ctx->param->bufsize))); + PUTBACK; + + call_sv (callback_data->callback, G_SCALAR); + + SPAGAIN; + FREETMPS; + LEAVE; + +} diff --git a/perl.h b/perl.h index 3a659bbe1..fc2ae9321 100644 --- a/perl.h +++ b/perl.h @@ -3,13 +3,17 @@ #include #include +#include "memcached.h" struct uri; +struct worker_task; -int perl_call_header_filter (const char *function, const char *header_name, const char *header_value); -int perl_call_mime_filter (const char *function, GByteArray *content); -int perl_call_message_filter (const char *function, GByteArray *content); -int perl_call_url_filter (const char *function, struct uri *uri); -int perl_call_chain_filter (const char *function, GArray *results); +int perl_call_header_filter (const char *function, struct worker_task *task); +int perl_call_mime_filter (const char *function, struct worker_task *task); +int perl_call_message_filter (const char *function, struct worker_task *task); +int perl_call_url_filter (const char *function, struct worker_task *task); +int perl_call_chain_filter (const char *function, struct worker_task *task); + +void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data); #endif diff --git a/perl/rspamd.xs b/perl/rspamd.xs index ada7a5997..681e40308 100644 --- a/perl/rspamd.xs +++ b/perl/rspamd.xs @@ -6,12 +6,13 @@ #include #include +#include +#include +#include + #include "../config.h" #include "../main.h" - -#include "EXTERN.h" -#include "perl.h" -#include "XSUB.h" +#include "../perl.h" #define perl_set_session(r) \ r = INT2PTR(struct worker_task *, SvIV((SV *) SvRV(ST(0)))) @@ -98,3 +99,121 @@ get_part (r, num) OUTPUT: RETVAL +void +read_memcached_key (r, key, datalen, callback) + CODE: + struct worker_task *r; + char *key; + unsigned int datalen; + SV *callback; + STRLEN keylen; + struct _param { + SV *callback; + struct worker_task *task; + } *callback_data; + memcached_param_t param; + + perl_set_session (r); + key = (char *) SvPV (ST(1), keylen); + datalen = (unsigned int) SvIV (ST(2)); + callback = SvRV(ST(3)); + + r->memc_ctx->callback = perl_call_memcached_callback; + callback_data = malloc (sizeof (struct _param)); + if (callback_data == NULL) { + XSRETURN_UNDEF; + } + callback_data->callback = callback; + callback_data->task = r; + r->memc_ctx->callback_data = (void *)callback_data; + + r->memc_busy = 1; + + strlcpy (param.key, key, sizeof (param.key)); + param.buf = malloc (datalen); + if (param.buf != NULL) { + param.bufsize = datalen; + } + param.bufpos = 0; + param.expire = 0; + + memc_get (r->memc_ctx, ¶m); + XSRETURN_EMPTY; + +void +write_memcached_key (r, key, data, expire, callback) + CODE: + struct worker_task *r; + char *key, *data; + SV *callback; + STRLEN keylen, datalen; + int expire; + struct _param { + SV *callback; + struct worker_task *task; + } *callback_data; + memcached_param_t param; + + perl_set_session (r); + key = (char *) SvPV (ST(1), keylen); + data = (char *) SvPV (ST(2), datalen); + expire = (int) SvIV (ST(3)); + callback = SvRV(ST(4)); + + r->memc_ctx->callback = perl_call_memcached_callback; + callback_data = malloc (sizeof (struct _param)); + if (callback_data == NULL) { + XSRETURN_UNDEF; + } + callback_data->callback = callback; + callback_data->task = r; + r->memc_ctx->callback_data = (void *)callback_data; + + r->memc_busy = 1; + + strlcpy (param.key, key, sizeof (param.key)); + param.buf = data; + param.bufsize = datalen; + param.bufpos = 0; + param.expire = expire; + + memc_set (r->memc_ctx, ¶m, expire); + XSRETURN_EMPTY; + +void +delete_memcached_key (r, key, callback) + CODE: + struct worker_task *r; + char *key; + SV *callback; + STRLEN keylen; + struct _param { + SV *callback; + struct worker_task *task; + } *callback_data; + memcached_param_t param; + + perl_set_session (r); + key = (char *) SvPV (ST(1), keylen); + callback = SvRV(ST(2)); + + r->memc_ctx->callback = perl_call_memcached_callback; + callback_data = malloc (sizeof (struct _param)); + if (callback_data == NULL) { + XSRETURN_UNDEF; + } + callback_data->callback = callback; + callback_data->task = r; + r->memc_ctx->callback_data = (void *)callback_data; + + r->memc_busy = 1; + + strlcpy (param.key, key, sizeof (param.key)); + param.buf = NULL; + param.bufsize = 0; + param.bufpos = 0; + param.expire = 0; + + memc_delete (r->memc_ctx, ¶m); + XSRETURN_EMPTY; + diff --git a/worker.c b/worker.c index 0b3dd30bc..d45284392 100644 --- a/worker.c +++ b/worker.c @@ -92,6 +92,10 @@ free_task (struct worker_task *task) if (task->rcpt) { free (task->rcpt); } + if (task->memc_ctx) { + memc_close_ctx (task->memc_ctx); + free (task->memc_ctx); + } while (!TAILQ_EMPTY (&task->urls)) { cur = TAILQ_FIRST (&task->urls); TAILQ_REMOVE (&task->urls, cur, next); @@ -394,7 +398,7 @@ accept_socket (int fd, short what, void *arg) new_task = malloc (sizeof (struct worker_task)); if (new_task == NULL) { - msg_err ("accept_socket: cannot allocate memory for task"); + msg_err ("accept_socket: cannot allocate memory for task, %m"); return; } new_task->worker = worker; @@ -404,6 +408,15 @@ accept_socket (int fd, short what, void *arg) TAILQ_INIT (&new_task->urls); TAILQ_INIT (&new_task->results); TAILQ_INIT (&new_task->parts); + new_task->memc_ctx = malloc (sizeof (memcached_ctx_t)); + if (new_task->memc_ctx == NULL) { + msg_err ("accept_socket: cannot allocate memory for memcached ctx, %m"); + } + else { + if (memc_init_ctx (new_task->memc_ctx) == -1) { + msg_err ("accept_socket: cannot init memcached context for task"); + } + } /* Read event */ new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);