summaryrefslogtreecommitdiffstats
path: root/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2008-08-22 18:15:02 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2008-08-22 18:15:02 +0400
commita4473aedcb7c49b494112bce63d06c98d88e0e0d (patch)
tree17c39ecf0b973dd3b74db3b02d678944eac649d9 /worker.c
parentf3b6712e04fd993caccaa18e425639d85d81b1eb (diff)
downloadrspamd-a4473aedcb7c49b494112bce63d06c98d88e0e0d.tar.gz
rspamd-a4473aedcb7c49b494112bce63d06c98d88e0e0d.zip
* Add implementation of save point for async events in rspamd filters
Diffstat (limited to 'worker.c')
-rw-r--r--worker.c107
1 files changed, 92 insertions, 15 deletions
diff --git a/worker.c b/worker.c
index 84c9ef763..ac9dea9db 100644
--- a/worker.c
+++ b/worker.c
@@ -193,47 +193,82 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data)
}
}
-static void
+int
process_filters (struct worker_task *task)
{
struct filter_result *res = NULL;
struct chain_result *chain_res = NULL;
- struct c_module *c_filter;
- struct filter_chain *chain;
- struct script_param *perl_script;
+ struct c_module *c_filter = NULL;
+ struct filter_chain *chain = NULL;
+ struct script_param *perl_script = NULL;
int i = 0;
/* First process C modules */
- LIST_FOREACH (c_filter, &task->cfg->c_modules, next) {
+ if (task->save.saved == 1) {
+ if (task->save.save_type == C_FILTER) {
+ task->save.saved = 0;
+ c_filter = (struct c_module *)task->save.entry;
+ }
+ else if (task->save.save_type == PERL_FILTER) {
+ chain = (struct filter_chain *)task->save.chain;
+ perl_script = (struct script_param *)task->save.entry;
+ task->save.saved = 0;
+ }
+ }
+ else {
+ c_filter = LIST_FIRST (&task->cfg->c_modules);
+ chain = LIST_FIRST (&task->cfg->filters);
+ if (chain) {
+ perl_script = LIST_FIRST (chain->scripts);
+ }
+ }
+ while (c_filter != NULL) {
res = malloc (sizeof (struct filter_result));
if (res == NULL) {
msg_err ("process_filters: malloc failed, %m");
- return;
+ return -1;
}
res->chain = NULL;
res->symbol = c_filter->name;
res->mark = 0;
if (c_filter->ctx->header_filter != NULL) {
res->mark += c_filter->ctx->header_filter (task);
+ if (task->save.saved == 1) {
+ task->save.save_type = C_FILTER;
+ goto save_point;
+ }
}
if (c_filter->ctx->message_filter != NULL) {
res->mark += c_filter->ctx->message_filter (task);
+ if (task->save.saved == 1) {
+ task->save.save_type = C_FILTER;
+ goto save_point;
+ }
}
if (c_filter->ctx->mime_filter != NULL) {
res->mark += c_filter->ctx->mime_filter (task);
+ if (task->save.saved == 1) {
+ task->save.save_type = C_FILTER;
+ goto save_point;
+ }
}
if (c_filter->ctx->url_filter != NULL) {
res->mark += c_filter->ctx->url_filter (task);
+ if (task->save.saved == 1) {
+ task->save.save_type = C_FILTER;
+ goto save_point;
+ }
}
TAILQ_INSERT_TAIL (&task->results, res, next);
+ c_filter = LIST_NEXT (c_filter, next);
}
/* Process perl chains */
- LIST_FOREACH (chain, &task->cfg->filters, next) {
+ while (chain != NULL) {
chain_res = malloc (sizeof (struct chain_result));
if (chain_res == NULL) {
msg_err ("process_filters: malloc failed, %m");
- return;
+ return -1;
}
i = 0;
chain_res->chain = chain;
@@ -242,9 +277,9 @@ process_filters (struct worker_task *task)
if (chain_res->marks == NULL) {
free (chain_res);
msg_err ("process_filters: malloc failed, %m");
- return;
+ return -1;
}
- LIST_FOREACH (perl_script, chain->scripts, next) {
+ while (perl_script != NULL) {
if (perl_script->type == SCRIPT_CHAIN) {
/* Skip chain filters first */
continue;
@@ -252,7 +287,7 @@ process_filters (struct worker_task *task)
res = malloc (sizeof (struct filter_result));
if (res == NULL) {
msg_err ("process_filters: malloc failed, %m");
- return;
+ return -1;
}
res->chain = chain;
res->symbol = perl_script->symbol;
@@ -260,22 +295,40 @@ process_filters (struct worker_task *task)
switch (perl_script->type) {
case SCRIPT_HEADER:
res->mark += perl_call_header_filter (perl_script->function, task);
+ if (task->save.saved == 1) {
+ task->save.save_type = PERL_FILTER;
+ goto save_point;
+ }
break;
case SCRIPT_MESSAGE:
res->mark += perl_call_message_filter (perl_script->function, task);
+ if (task->save.saved == 1) {
+ task->save.save_type = PERL_FILTER;
+ goto save_point;
+ }
break;
case SCRIPT_MIME:
res->mark += perl_call_mime_filter (perl_script->function, task);
+ if (task->save.saved == 1) {
+ task->save.save_type = PERL_FILTER;
+ goto save_point;
+ }
break;
case SCRIPT_URL:
res->mark += perl_call_url_filter (perl_script->function, task);
+ if (task->save.saved == 1) {
+ task->save.save_type = PERL_FILTER;
+ goto save_point;
+ }
break;
}
TAILQ_INSERT_TAIL (&task->results, res, next);
chain_res->marks[i++] = res->mark;
+ perl_script = LIST_NEXT (perl_script, next);
}
chain_res->marks_num = i;
TAILQ_INSERT_TAIL (&task->chain_results, chain_res, next);
+ chain = LIST_NEXT (chain, next);
}
/* Now process chain results */
@@ -297,9 +350,23 @@ process_filters (struct worker_task *task)
}
}
}
+
+ task->state = WRITE_REPLY;
+ bufferevent_enable (task->bev, EV_WRITE);
+ return 0;
+
+save_point:
+ if (task->save.save_type == C_FILTER) {
+ task->save.entry = LIST_NEXT (c_filter, next);
+ }
+ else if (task->save.save_type == PERL_FILTER) {
+ task->save.chain = LIST_NEXT (chain, next);
+ task->save.entry = LIST_NEXT (perl_script, next);
+ }
+ return 1;
}
-static void
+static int
process_message (struct worker_task *task)
{
GMimeMessage *message;
@@ -327,7 +394,7 @@ process_message (struct worker_task *task)
msg_info ("process_message: found %d parts in message", task->parts_count);
- process_filters (task);
+ return process_filters (task);
}
static void
@@ -448,8 +515,13 @@ read_socket (struct bufferevent *bev, void *arg)
task->msg->pos += r;
update_buf_size (task->msg);
if (task->msg->free == 0) {
- process_message (task);
- task->state = WRITE_REPLY;
+ r = process_message (task);
+ if (r == -1) {
+ task->state = WRITE_ERROR;
+ }
+ else if (r == 1) {
+ task->state = WAIT_FILTER;
+ }
}
}
else {
@@ -459,6 +531,10 @@ read_socket (struct bufferevent *bev, void *arg)
free_task (task);
}
break;
+ case WAIT_FILTER:
+ bufferevent_disable (bev, EV_READ);
+ bufferevent_disable (bev, EV_READ);
+ break;
case WRITE_REPLY:
r = bufferevent_write (bev, "Ok\r\n", sizeof ("Ok\r\n") - 1);
bufferevent_disable (bev, EV_READ);
@@ -518,6 +594,7 @@ accept_socket (int fd, short what, void *arg)
msg_err ("accept_socket: cannot allocate memory for task, %m");
return;
}
+ bzero (new_task, sizeof (struct worker_task));
new_task->worker = worker;
new_task->state = READ_COMMAND;
new_task->content_length = 0;