summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2008-08-22 18:15:02 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2008-08-22 18:15:02 +0400
commita4473aedcb7c49b494112bce63d06c98d88e0e0d (patch)
tree17c39ecf0b973dd3b74db3b02d678944eac649d9
parentf3b6712e04fd993caccaa18e425639d85d81b1eb (diff)
downloadrspamd-a4473aedcb7c49b494112bce63d06c98d88e0e0d.tar.gz
rspamd-a4473aedcb7c49b494112bce63d06c98d88e0e0d.zip
* Add implementation of save point for async events in rspamd filters
-rw-r--r--main.h11
-rw-r--r--perl.c3
-rw-r--r--perl/rspamd.xs22
-rw-r--r--worker.c107
4 files changed, 127 insertions, 16 deletions
diff --git a/main.h b/main.h
index fdf22b8ba..6e973ef20 100644
--- a/main.h
+++ b/main.h
@@ -93,6 +93,13 @@ struct mime_part {
TAILQ_ENTRY (mime_part) next;
};
+struct save_point {
+ enum { C_FILTER, PERL_FILTER } save_type;
+ void *entry;
+ void *chain;
+ unsigned saved:1;
+};
+
struct worker_task {
struct rspamd_worker *worker;
enum {
@@ -101,6 +108,7 @@ struct worker_task {
READ_MESSAGE,
WRITE_REPLY,
WRITE_ERROR,
+ WAIT_FILTER,
} state;
size_t content_length;
char *helo;
@@ -126,6 +134,7 @@ struct worker_task {
/* Results of all chains */
TAILQ_HEAD (chainsq, chain_result) chain_results;
struct config_file *cfg;
+ struct save_point save;
};
struct module_ctx {
@@ -142,7 +151,7 @@ struct c_module {
};
void start_worker (struct rspamd_worker *worker, int listen_sock);
-
+int process_filters (struct worker_task *task);
#endif
diff --git a/perl.c b/perl.c
index 2cdfacf55..2e1c9e5d7 100644
--- a/perl.c
+++ b/perl.c
@@ -181,6 +181,9 @@ void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, voi
free (callback_data);
free (ctx);
+ /* Set save point */
+ callback_data->task->save.saved = 0;
+ process_filters (callback_data->task);
SPAGAIN;
FREETMPS;
diff --git a/perl/rspamd.xs b/perl/rspamd.xs
index 4ec40f0fb..1632ebade 100644
--- a/perl/rspamd.xs
+++ b/perl/rspamd.xs
@@ -100,6 +100,22 @@ get_part (r, num)
RETVAL
void
+save_point (r)
+ CODE:
+ struct worker_task *r;
+
+ perl_set_session (r);
+ r->save.saved = 1;
+
+void
+recall_filter (r)
+ CODE:
+ struct worker_task *r;
+
+ perl_set_session (r);
+ process_filters (r);
+
+void
read_memcached_key (r, key, datalen, callback)
CODE:
struct worker_task *r;
@@ -144,6 +160,8 @@ read_memcached_key (r, key, datalen, callback)
param.expire = 0;
memc_get (ctx, &param);
+ /* Set save point */
+ r->save.saved = 1;
XSRETURN_EMPTY;
void
@@ -190,6 +208,8 @@ write_memcached_key (r, key, data, expire, callback)
param.expire = expire;
memc_set (ctx, &param, expire);
+ /* Set save point */
+ r->save.saved = 1;
XSRETURN_EMPTY;
void
@@ -233,5 +253,7 @@ delete_memcached_key (r, key, callback)
param.expire = 0;
memc_delete (ctx, &param);
+ /* Set save point */
+ r->save.saved = 1;
XSRETURN_EMPTY;
diff --git a/worker.c b/worker.c
index 84c9ef763..ac9dea9db 100644
--- a/worker.c
+++ b/worker.c
@@ -193,47 +193,82 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data)
}
}
-static void
+int
process_filters (struct worker_task *task)
{
struct filter_result *res = NULL;
struct chain_result *chain_res = NULL;
- struct c_module *c_filter;
- struct filter_chain *chain;
- struct script_param *perl_script;
+ struct c_module *c_filter = NULL;
+ struct filter_chain *chain = NULL;
+ struct script_param *perl_script = NULL;
int i = 0;
/* First process C modules */
- LIST_FOREACH (c_filter, &task->cfg->c_modules, next) {
+ if (task->save.saved == 1) {
+ if (task->save.save_type == C_FILTER) {
+ task->save.saved = 0;
+ c_filter = (struct c_module *)task->save.entry;
+ }
+ else if (task->save.save_type == PERL_FILTER) {
+ chain = (struct filter_chain *)task->save.chain;
+ perl_script = (struct script_param *)task->save.entry;
+ task->save.saved = 0;
+ }
+ }
+ else {
+ c_filter = LIST_FIRST (&task->cfg->c_modules);
+ chain = LIST_FIRST (&task->cfg->filters);
+ if (chain) {
+ perl_script = LIST_FIRST (chain->scripts);
+ }
+ }
+ while (c_filter != NULL) {
res = malloc (sizeof (struct filter_result));
if (res == NULL) {
msg_err ("process_filters: malloc failed, %m");
- return;
+ return -1;
}
res->chain = NULL;
res->symbol = c_filter->name;
res->mark = 0;
if (c_filter->ctx->header_filter != NULL) {
res->mark += c_filter->ctx->header_filter (task);
+ if (task->save.saved == 1) {
+ task->save.save_type = C_FILTER;
+ goto save_point;
+ }
}
if (c_filter->ctx->message_filter != NULL) {
res->mark += c_filter->ctx->message_filter (task);
+ if (task->save.saved == 1) {
+ task->save.save_type = C_FILTER;
+ goto save_point;
+ }
}
if (c_filter->ctx->mime_filter != NULL) {
res->mark += c_filter->ctx->mime_filter (task);
+ if (task->save.saved == 1) {
+ task->save.save_type = C_FILTER;
+ goto save_point;
+ }
}
if (c_filter->ctx->url_filter != NULL) {
res->mark += c_filter->ctx->url_filter (task);
+ if (task->save.saved == 1) {
+ task->save.save_type = C_FILTER;
+ goto save_point;
+ }
}
TAILQ_INSERT_TAIL (&task->results, res, next);
+ c_filter = LIST_NEXT (c_filter, next);
}
/* Process perl chains */
- LIST_FOREACH (chain, &task->cfg->filters, next) {
+ while (chain != NULL) {
chain_res = malloc (sizeof (struct chain_result));
if (chain_res == NULL) {
msg_err ("process_filters: malloc failed, %m");
- return;
+ return -1;
}
i = 0;
chain_res->chain = chain;
@@ -242,9 +277,9 @@ process_filters (struct worker_task *task)
if (chain_res->marks == NULL) {
free (chain_res);
msg_err ("process_filters: malloc failed, %m");
- return;
+ return -1;
}
- LIST_FOREACH (perl_script, chain->scripts, next) {
+ while (perl_script != NULL) {
if (perl_script->type == SCRIPT_CHAIN) {
/* Skip chain filters first */
continue;
@@ -252,7 +287,7 @@ process_filters (struct worker_task *task)
res = malloc (sizeof (struct filter_result));
if (res == NULL) {
msg_err ("process_filters: malloc failed, %m");
- return;
+ return -1;
}
res->chain = chain;
res->symbol = perl_script->symbol;
@@ -260,22 +295,40 @@ process_filters (struct worker_task *task)
switch (perl_script->type) {
case SCRIPT_HEADER:
res->mark += perl_call_header_filter (perl_script->function, task);
+ if (task->save.saved == 1) {
+ task->save.save_type = PERL_FILTER;
+ goto save_point;
+ }
break;
case SCRIPT_MESSAGE:
res->mark += perl_call_message_filter (perl_script->function, task);
+ if (task->save.saved == 1) {
+ task->save.save_type = PERL_FILTER;
+ goto save_point;
+ }
break;
case SCRIPT_MIME:
res->mark += perl_call_mime_filter (perl_script->function, task);
+ if (task->save.saved == 1) {
+ task->save.save_type = PERL_FILTER;
+ goto save_point;
+ }
break;
case SCRIPT_URL:
res->mark += perl_call_url_filter (perl_script->function, task);
+ if (task->save.saved == 1) {
+ task->save.save_type = PERL_FILTER;
+ goto save_point;
+ }
break;
}
TAILQ_INSERT_TAIL (&task->results, res, next);
chain_res->marks[i++] = res->mark;
+ perl_script = LIST_NEXT (perl_script, next);
}
chain_res->marks_num = i;
TAILQ_INSERT_TAIL (&task->chain_results, chain_res, next);
+ chain = LIST_NEXT (chain, next);
}
/* Now process chain results */
@@ -297,9 +350,23 @@ process_filters (struct worker_task *task)
}
}
}
+
+ task->state = WRITE_REPLY;
+ bufferevent_enable (task->bev, EV_WRITE);
+ return 0;
+
+save_point:
+ if (task->save.save_type == C_FILTER) {
+ task->save.entry = LIST_NEXT (c_filter, next);
+ }
+ else if (task->save.save_type == PERL_FILTER) {
+ task->save.chain = LIST_NEXT (chain, next);
+ task->save.entry = LIST_NEXT (perl_script, next);
+ }
+ return 1;
}
-static void
+static int
process_message (struct worker_task *task)
{
GMimeMessage *message;
@@ -327,7 +394,7 @@ process_message (struct worker_task *task)
msg_info ("process_message: found %d parts in message", task->parts_count);
- process_filters (task);
+ return process_filters (task);
}
static void
@@ -448,8 +515,13 @@ read_socket (struct bufferevent *bev, void *arg)
task->msg->pos += r;
update_buf_size (task->msg);
if (task->msg->free == 0) {
- process_message (task);
- task->state = WRITE_REPLY;
+ r = process_message (task);
+ if (r == -1) {
+ task->state = WRITE_ERROR;
+ }
+ else if (r == 1) {
+ task->state = WAIT_FILTER;
+ }
}
}
else {
@@ -459,6 +531,10 @@ read_socket (struct bufferevent *bev, void *arg)
free_task (task);
}
break;
+ case WAIT_FILTER:
+ bufferevent_disable (bev, EV_READ);
+ bufferevent_disable (bev, EV_READ);
+ break;
case WRITE_REPLY:
r = bufferevent_write (bev, "Ok\r\n", sizeof ("Ok\r\n") - 1);
bufferevent_disable (bev, EV_READ);
@@ -518,6 +594,7 @@ accept_socket (int fd, short what, void *arg)
msg_err ("accept_socket: cannot allocate memory for task, %m");
return;
}
+ bzero (new_task, sizeof (struct worker_task));
new_task->worker = worker;
new_task->state = READ_COMMAND;
new_task->content_length = 0;