+.PHONY: perl clean
all: perl $(TARGETS)
-perl:
+perl: perl/Makefile
cd perl && make && cd ..
+perl/Makefile: perl/Makefile.PL
+ cd perl && perl Makefile.PL && cd ..
+
memctest: upstream.c memcached.c memcached-test.c
$(CC) $(OPT_FLAGS) $(CFLAGS) $(PTHREAD_CFLAGS) -c upstream.c
$(CC) $(OPT_FLAGS) $(CFLAGS) $(PTHREAD_CFLAGS) -c memcached.c
$(CC) $(OPT_FLAGS) $(PTHREAD_LDFLAGS) $(LD_PATH) upstream.o memcached.o memcached-test.o $(LIBS) -o memcached-test
install: $(EXEC)
+ cd perl && make install && cd ..
$(INSTALL) -b $(EXEC) $(PREFIX)/sbin/$(EXEC)
$(INSTALL) -v $(EXEC).sh $(PREFIX)/etc/rc.d
#$(INSTALL) -m0644 rspamd.8 $(MANPATH)/man8
#include "fstring.h"
#include "url.h"
+#include "memcached.h"
#include <glib.h>
#include <gmime/gmime.h>
struct in_addr from_addr;
f_str_buf_t *msg;
struct bufferevent *bev;
+ /* Memcached connection for this task */
+ memcached_ctx_t *memc_ctx;
+ unsigned memc_busy:1;
/* Number of mime parts */
int parts_count;
/* Headers */
event_set (&ctx->mem_ev, ctx->sock, EV_READ | EV_PERSIST | EV_TIMEOUT, socket_callback, (void *)ctx);
event_add (&ctx->mem_ev, NULL);
ctx->callback (ctx, OK, ctx->callback_data);
+ ctx->alive = 1;
}
else {
ctx->callback (ctx, SERVER_TIMEOUT, ctx->callback_data);
+ ctx->alive = 0;
}
break;
case CMD_WRITE:
* Common read command handler for memcached
*/
memc_error_t
-memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem)
+memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param)
{
- int i;
- for (i = 0; i < *nelem; i++) {
- ctx->cmd = cmd;
- ctx->op = CMD_READ;
- ctx->param = ¶ms[i];
- event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
- event_add (&ctx->mem_ev, &ctx->timeout);
- }
+ ctx->cmd = cmd;
+ ctx->op = CMD_READ;
+ ctx->param = param;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
return OK;
}
* Common write command handler for memcached
*/
memc_error_t
-memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire)
+memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire)
{
- int i;
- for (i = 0; i < *nelem; i++) {
- ctx->cmd = cmd;
- ctx->op = CMD_WRITE;
- ctx->param = ¶ms[i];
- event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
- event_add (&ctx->mem_ev, &ctx->timeout);
- }
+ ctx->cmd = cmd;
+ ctx->op = CMD_WRITE;
+ ctx->param = param;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
return OK;
}
* Delete command handler
*/
memc_error_t
-memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem)
+memc_delete (memcached_ctx_t *ctx, memcached_param_t *param)
{
- int i;
-
- for (i = 0; i < *nelem; i++) {
- ctx->cmd = "delete";
- ctx->op = CMD_DELETE;
- ctx->param = ¶ms[i];
- event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
- event_add (&ctx->mem_ev, &ctx->timeout);
- }
+ ctx->cmd = "delete";
+ ctx->op = CMD_DELETE;
+ ctx->param = param;
+ event_set (&ctx->mem_ev, ctx->sock, EV_WRITE | EV_TIMEOUT, socket_callback, (void *)ctx);
+ event_add (&ctx->mem_ev, &ctx->timeout);
return OK;
}
* writing is done to each memcached server
*/
memc_error_t
-memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem, int expire)
+memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire)
{
memc_error_t r, result = OK;
while (memcached_num --) {
if (ctx[memcached_num].alive == 1) {
- r = memc_write (&ctx[memcached_num], cmd, params, nelem, expire);
+ r = memc_write (&ctx[memcached_num], cmd, param, expire);
if (r != OK) {
memc_log (&ctx[memcached_num], __LINE__, "memc_write_mirror: cannot write to mirror server: %s", memc_strerror (r));
result = r;
* reading is done from first active memcached server
*/
memc_error_t
-memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem)
+memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param)
{
memc_error_t r, result = OK;
while (memcached_num --) {
if (ctx[memcached_num].alive == 1) {
- r = memc_read (&ctx[memcached_num], cmd, params, nelem);
+ r = memc_read (&ctx[memcached_num], cmd, param);
if (r != OK) {
result = r;
if (r != NOT_EXISTS) {
* deleting is done for each active memcached server
*/
memc_error_t
-memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem)
+memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param)
{
memc_error_t r, result = OK;
while (memcached_num --) {
if (ctx[memcached_num].alive == 1) {
- r = memc_delete (&ctx[memcached_num], params, nelem);
+ r = memc_delete (&ctx[memcached_num], param);
if (r != OK) {
result = r;
if (r != NOT_EXISTS) {
}
ctx->count = 0;
- ctx->alive = 1;
+ ctx->alive = 0;
ctx->op = CMD_NULL;
/* Set default callback */
if (ctx->callback == NULL) {
* "prepend" means "add this data to an existing key before existing data".
*/
-#define memc_get(ctx, params, nelem) memc_read(ctx, "get", params, nelem)
-#define memc_set(ctx, params, nelem, expire) memc_write(ctx, "set", params, nelem, expire)
-#define memc_add(ctx, params, nelem, expire) memc_write(ctx, "add", params, nelem, expire)
-#define memc_replace(ctx, params, nelem, expire) memc_write(ctx, "replace", params, nelem, expire)
-#define memc_append(ctx, params, nelem, expire) memc_write(ctx, "append", params, nelem, expire)
-#define memc_prepend(ctx, params, nelem, expire) memc_write(ctx, "prepend", params, nelem, expire)
+#define memc_get(ctx, param) memc_read(ctx, "get", param)
+#define memc_set(ctx, param, expire) memc_write(ctx, "set", param, expire)
+#define memc_add(ctx, param, expire) memc_write(ctx, "add", param, expire)
+#define memc_replace(ctx, param, expire) memc_write(ctx, "replace", param, expire)
+#define memc_append(ctx, param, expire) memc_write(ctx, "append", param, expire)
+#define memc_prepend(ctx, param, expire) memc_write(ctx, "prepend", param, expire)
/* Functions that works with mirror of memcached servers */
-#define memc_get_mirror(ctx, num, params, nelem) memc_read_mirror(ctx, num, "get", params, nelem)
-#define memc_set_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "set", params, nelem, expire)
-#define memc_add_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "add", params, nelem, expire)
-#define memc_replace_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "replace", params, nelem, expire)
-#define memc_append_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "append", params, nelem, expire)
-#define memc_prepend_mirror(ctx, num, params, nelem, expire) memc_write_mirror(ctx, num, "prepend", params, nelem, expire)
+#define memc_get_mirror(ctx, num, param) memc_read_mirror(ctx, num, "get", param)
+#define memc_set_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "set", param, expire)
+#define memc_add_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "add", param, expire)
+#define memc_replace_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "replace", param, expire)
+#define memc_append_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "append", param, expire)
+#define memc_prepend_mirror(ctx, num, param, expire) memc_write_mirror(ctx, num, "prepend", param, expire)
-memc_error_t memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem);
-memc_error_t memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *params, size_t *nelem, int expire);
-memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params, size_t *nelem);
+memc_error_t memc_read (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param);
+memc_error_t memc_write (memcached_ctx_t *ctx, const char *cmd, memcached_param_t *param, int expire);
+memc_error_t memc_delete (memcached_ctx_t *ctx, memcached_param_t *params);
-memc_error_t memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem, int expire);
-memc_error_t memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem);
-memc_error_t memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *params, size_t *nelem);
+memc_error_t memc_write_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param, int expire);
+memc_error_t memc_read_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param);
+memc_error_t memc_delete_mirror (memcached_ctx_t *ctx, size_t memcached_num, const char *cmd, memcached_param_t *param);
/* Return symbolic name of memcached error*/
const char * memc_strerror (memc_error_t err);
extern PerlInterpreter *my_perl;
int
-perl_call_header_filter (const char *function, const char *header_name, const char *header_value)
+perl_call_header_filter (const char *function, struct worker_task *task)
{
int result;
dSP;
SAVETMPS;
PUSHMARK (SP);
- XPUSHs (sv_2mortal (newSVpv (header_name, 0)));
- XPUSHs (sv_2mortal (newSVpv (header_value, 0)));
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
PUTBACK;
call_pv (function, G_SCALAR);
SPAGAIN;
result = POPi;
- msg_debug ("header_filter: call of %s with header %s returned mark %d\n", function, header_name, result);
+ msg_debug ("header_filter: call of %s with returned mark %d\n", function, result);
PUTBACK;
FREETMPS;
}
int
-perl_call_mime_filter (const char *function, GByteArray *content)
+perl_call_mime_filter (const char *function, struct worker_task *task)
{
int result;
dSP;
SAVETMPS;
PUSHMARK (SP);
- XPUSHs (sv_2mortal (newSVpv (content->data, content->len)));
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
PUTBACK;
call_pv (function, G_SCALAR);
}
int
-perl_call_message_filter (const char *function, GByteArray *content)
+perl_call_message_filter (const char *function, struct worker_task *task)
{
int result;
dSP;
SAVETMPS;
PUSHMARK (SP);
- XPUSHs (sv_2mortal (newSVpv (content->data, content->len)));
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
PUTBACK;
call_pv (function, G_SCALAR);
}
int
-perl_call_url_filter (const char *function, struct uri *uri)
+perl_call_url_filter (const char *function, struct worker_task *task)
{
int result;
dSP;
ENTER;
SAVETMPS;
- /* URL:
- * url,
- * host,
- * data
- */
PUSHMARK (SP);
- XPUSHs (sv_2mortal (newSVpv (uri->string, 0)));
- XPUSHs (sv_2mortal (newSVpv (uri->host, uri->hostlen)));
- XPUSHs (sv_2mortal (newSVpv (uri->data, uri->datalen)));
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
PUTBACK;
call_pv (function, G_SCALAR);
SPAGAIN;
result = POPi;
- msg_debug ("url_filter: call of %s for url '%s' returned mark %d\n", function, uri->string, result);
+ msg_debug ("url_filter: call of %s for url returned mark %d\n", function, result);
PUTBACK;
FREETMPS;
}
int
-perl_call_chain_filter (const char *function, GArray *results)
+perl_call_chain_filter (const char *function, struct worker_task *task)
{
- int result, i;
+ int result;
dSP;
ENTER;
SAVETMPS;
PUSHMARK (SP);
- for (i = 0; i < results->len; i ++) {
- XPUSHs (sv_2mortal (newSViv (g_array_index (results, int, i))));
- }
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
PUTBACK;
call_pv (function, G_SCALAR);
return result;
}
+
+void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data)
+{
+ struct {
+ SV *callback;
+ struct worker_task *task;
+ } *callback_data = data;
+
+ dSP;
+
+ ENTER;
+ SAVETMPS;
+ PUSHMARK (SP);
+ XPUSHs (sv_2mortal (newSViv (PTR2IV (callback_data->task))));
+ XPUSHs (sv_2mortal (newSViv (error)));
+ XPUSHs (sv_2mortal (newSVpv (ctx->param->buf, ctx->param->bufsize)));
+ PUTBACK;
+
+ call_sv (callback_data->callback, G_SCALAR);
+
+ SPAGAIN;
+ FREETMPS;
+ LEAVE;
+
+}
#include <sys/types.h>
#include <glib.h>
+#include "memcached.h"
struct uri;
+struct worker_task;
-int perl_call_header_filter (const char *function, const char *header_name, const char *header_value);
-int perl_call_mime_filter (const char *function, GByteArray *content);
-int perl_call_message_filter (const char *function, GByteArray *content);
-int perl_call_url_filter (const char *function, struct uri *uri);
-int perl_call_chain_filter (const char *function, GArray *results);
+int perl_call_header_filter (const char *function, struct worker_task *task);
+int perl_call_mime_filter (const char *function, struct worker_task *task);
+int perl_call_message_filter (const char *function, struct worker_task *task);
+int perl_call_url_filter (const char *function, struct worker_task *task);
+int perl_call_chain_filter (const char *function, struct worker_task *task);
+
+void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data);
#endif
#include <sys/types.h>
#include <unistd.h>
+#include <EXTERN.h>
+#include <perl.h>
+#include <XSUB.h>
+
#include "../config.h"
#include "../main.h"
-
-#include "EXTERN.h"
-#include "perl.h"
-#include "XSUB.h"
+#include "../perl.h"
#define perl_set_session(r) \
r = INT2PTR(struct worker_task *, SvIV((SV *) SvRV(ST(0))))
OUTPUT:
RETVAL
+void
+read_memcached_key (r, key, datalen, callback)
+ CODE:
+ struct worker_task *r;
+ char *key;
+ unsigned int datalen;
+ SV *callback;
+ STRLEN keylen;
+ struct _param {
+ SV *callback;
+ struct worker_task *task;
+ } *callback_data;
+ memcached_param_t param;
+
+ perl_set_session (r);
+ key = (char *) SvPV (ST(1), keylen);
+ datalen = (unsigned int) SvIV (ST(2));
+ callback = SvRV(ST(3));
+
+ r->memc_ctx->callback = perl_call_memcached_callback;
+ callback_data = malloc (sizeof (struct _param));
+ if (callback_data == NULL) {
+ XSRETURN_UNDEF;
+ }
+ callback_data->callback = callback;
+ callback_data->task = r;
+ r->memc_ctx->callback_data = (void *)callback_data;
+
+ r->memc_busy = 1;
+
+ strlcpy (param.key, key, sizeof (param.key));
+ param.buf = malloc (datalen);
+ if (param.buf != NULL) {
+ param.bufsize = datalen;
+ }
+ param.bufpos = 0;
+ param.expire = 0;
+
+ memc_get (r->memc_ctx, ¶m);
+ XSRETURN_EMPTY;
+
+void
+write_memcached_key (r, key, data, expire, callback)
+ CODE:
+ struct worker_task *r;
+ char *key, *data;
+ SV *callback;
+ STRLEN keylen, datalen;
+ int expire;
+ struct _param {
+ SV *callback;
+ struct worker_task *task;
+ } *callback_data;
+ memcached_param_t param;
+
+ perl_set_session (r);
+ key = (char *) SvPV (ST(1), keylen);
+ data = (char *) SvPV (ST(2), datalen);
+ expire = (int) SvIV (ST(3));
+ callback = SvRV(ST(4));
+
+ r->memc_ctx->callback = perl_call_memcached_callback;
+ callback_data = malloc (sizeof (struct _param));
+ if (callback_data == NULL) {
+ XSRETURN_UNDEF;
+ }
+ callback_data->callback = callback;
+ callback_data->task = r;
+ r->memc_ctx->callback_data = (void *)callback_data;
+
+ r->memc_busy = 1;
+
+ strlcpy (param.key, key, sizeof (param.key));
+ param.buf = data;
+ param.bufsize = datalen;
+ param.bufpos = 0;
+ param.expire = expire;
+
+ memc_set (r->memc_ctx, ¶m, expire);
+ XSRETURN_EMPTY;
+
+void
+delete_memcached_key (r, key, callback)
+ CODE:
+ struct worker_task *r;
+ char *key;
+ SV *callback;
+ STRLEN keylen;
+ struct _param {
+ SV *callback;
+ struct worker_task *task;
+ } *callback_data;
+ memcached_param_t param;
+
+ perl_set_session (r);
+ key = (char *) SvPV (ST(1), keylen);
+ callback = SvRV(ST(2));
+
+ r->memc_ctx->callback = perl_call_memcached_callback;
+ callback_data = malloc (sizeof (struct _param));
+ if (callback_data == NULL) {
+ XSRETURN_UNDEF;
+ }
+ callback_data->callback = callback;
+ callback_data->task = r;
+ r->memc_ctx->callback_data = (void *)callback_data;
+
+ r->memc_busy = 1;
+
+ strlcpy (param.key, key, sizeof (param.key));
+ param.buf = NULL;
+ param.bufsize = 0;
+ param.bufpos = 0;
+ param.expire = 0;
+
+ memc_delete (r->memc_ctx, ¶m);
+ XSRETURN_EMPTY;
+
if (task->rcpt) {
free (task->rcpt);
}
+ if (task->memc_ctx) {
+ memc_close_ctx (task->memc_ctx);
+ free (task->memc_ctx);
+ }
while (!TAILQ_EMPTY (&task->urls)) {
cur = TAILQ_FIRST (&task->urls);
TAILQ_REMOVE (&task->urls, cur, next);
new_task = malloc (sizeof (struct worker_task));
if (new_task == NULL) {
- msg_err ("accept_socket: cannot allocate memory for task");
+ msg_err ("accept_socket: cannot allocate memory for task, %m");
return;
}
new_task->worker = worker;
TAILQ_INIT (&new_task->urls);
TAILQ_INIT (&new_task->results);
TAILQ_INIT (&new_task->parts);
+ new_task->memc_ctx = malloc (sizeof (memcached_ctx_t));
+ if (new_task->memc_ctx == NULL) {
+ msg_err ("accept_socket: cannot allocate memory for memcached ctx, %m");
+ }
+ else {
+ if (memc_init_ctx (new_task->memc_ctx) == -1) {
+ msg_err ("accept_socket: cannot init memcached context for task");
+ }
+ }
/* Read event */
new_task->bev = bufferevent_new (nfd, read_socket, write_socket, err_socket, (void *)new_task);