aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@rambler-co.ru>2008-08-21 17:58:30 +0400
committerVsevolod Stakhov <vsevolod@rambler-co.ru>2008-08-21 17:58:30 +0400
commitf3b6712e04fd993caccaa18e425639d85d81b1eb (patch)
treeaaa504a733d1c442692a8e25ad5680657de71942
parente90352d20a0d5f615c906b7719f95599cb2aaeac (diff)
downloadrspamd-f3b6712e04fd993caccaa18e425639d85d81b1eb.tar.gz
rspamd-f3b6712e04fd993caccaa18e425639d85d81b1eb.zip
* Add filters logic
* Perl should use separate memcached context for its operations (just do memcpy with the same socket) TODO: add block mechanics here to avoid memcached connection closing before perl operation is finished * Change logic of perl chain filter
-rw-r--r--cfg_file.h19
-rw-r--r--cfg_file.y4
-rw-r--r--main.h25
-rw-r--r--perl.c17
-rw-r--r--perl.h2
-rw-r--r--perl/rspamd.xs48
-rw-r--r--worker.c125
7 files changed, 201 insertions, 39 deletions
diff --git a/cfg_file.h b/cfg_file.h
index bf532c588..62856b489 100644
--- a/cfg_file.h
+++ b/cfg_file.h
@@ -6,6 +6,7 @@
#ifndef CFG_FILE_H
#define CFG_FILE_H
+#include "config.h"
#include <sys/types.h>
#ifndef OWN_QUEUE_H
#include <sys/queue.h>
@@ -18,6 +19,7 @@
#include <glib.h>
#include "upstream.h"
#include "memcached.h"
+#include "main.h"
#define DEFAULT_BIND_PORT 768
#define MAX_MEMCACHED_SERVERS 48
@@ -55,8 +57,6 @@ enum script_type {
SCRIPT_CHAIN,
};
-struct uri;
-
struct memcached_server {
struct upstream up;
struct in_addr addr;
@@ -70,20 +70,6 @@ struct perl_module {
LIST_ENTRY (perl_module) next;
};
-struct module_ctx {
- int (*header_filter)(const char *header_name, const char *header_value);
- int (*mime_filter)(GByteArray *content);
- int (*message_filter)(GByteArray *content);
- int (*uri_filter)(struct uri *uri);
- int (*chain_filter)(GArray *results);
-};
-
-struct c_module {
- const char *name;
- struct module_ctx *ctx;
- LIST_ENTRY (c_module) next;
-};
-
struct script_param {
char *symbol;
char *function;
@@ -93,6 +79,7 @@ struct script_param {
struct filter_chain {
unsigned int metric;
+ unsigned int scripts_number;
LIST_HEAD (scriptq, script_param) *scripts;
LIST_ENTRY (filter_chain) next;
};
diff --git a/cfg_file.y b/cfg_file.y
index 0c6038f60..91f84f4ce 100644
--- a/cfg_file.y
+++ b/cfg_file.y
@@ -25,6 +25,7 @@ extern int yylineno;
extern char *yytext;
struct scriptq *cur_scripts;
+unsigned int cur_scripts_num = 0;
%}
@@ -226,6 +227,7 @@ filterbody:
cur_chain->metric = $1;
cur_chain->scripts = cur_scripts;
+ cur_chain->scripts_number = cur_scripts_num;
LIST_INSERT_HEAD (&cfg->filters, cur_chain, next);
}
@@ -250,6 +252,7 @@ filter_chain:
YYERROR;
}
LIST_INSERT_HEAD (cur_scripts, $1, next);
+ cur_scripts_num = 1;
}
| filter_chain filter_param SEMICOLON {
if ($2 == NULL) {
@@ -257,6 +260,7 @@ filter_chain:
YYERROR;
}
LIST_INSERT_HEAD (cur_scripts, $2, next);
+ cur_scripts_num ++;
}
;
diff --git a/main.h b/main.h
index cb98167ae..fdf22b8ba 100644
--- a/main.h
+++ b/main.h
@@ -59,6 +59,7 @@ struct pidfh;
struct config_file;
struct filter_chain;
+
/* Struct that determine main server object (for logging purposes) */
struct rspamd_main {
struct config_file *cfg;
@@ -78,6 +79,14 @@ struct filter_result {
TAILQ_ENTRY (filter_result) next;
};
+struct chain_result {
+ struct filter_chain *chain;
+ int *marks;
+ unsigned int marks_num;
+ int result_mark;
+ TAILQ_ENTRY (chain_result) next;
+};
+
struct mime_part {
GMimeContentType *type;
GByteArray *content;
@@ -114,6 +123,22 @@ struct worker_task {
TAILQ_HEAD (uriq, uri) urls;
/* List of filter results */
TAILQ_HEAD (resultsq, filter_result) results;
+ /* Results of all chains */
+ TAILQ_HEAD (chainsq, chain_result) chain_results;
+ struct config_file *cfg;
+};
+
+struct module_ctx {
+ int (*header_filter)(struct worker_task *task);
+ int (*mime_filter)(struct worker_task *task);
+ int (*message_filter)(struct worker_task *task);
+ int (*url_filter)(struct worker_task *task);
+};
+
+struct c_module {
+ const char *name;
+ struct module_ctx *ctx;
+ LIST_ENTRY (c_module) next;
};
void start_worker (struct rspamd_worker *worker, int listen_sock);
diff --git a/perl.c b/perl.c
index e2ae086eb..2cdfacf55 100644
--- a/perl.c
+++ b/perl.c
@@ -125,16 +125,23 @@ perl_call_url_filter (const char *function, struct worker_task *task)
}
int
-perl_call_chain_filter (const char *function, struct worker_task *task)
+perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number)
{
- int result;
+ int result, i;
+ AV *av;
dSP;
-
+
ENTER;
SAVETMPS;
+ av = newAV();
+ av_extend (av, number);
+ for (i = 0; i < number; i ++) {
+ av_push (av, sv_2mortal (newSViv (marks[i])));
+ }
PUSHMARK (SP);
XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
+ XPUSHs (AvARRAY (av));
PUTBACK;
call_pv (function, G_SCALAR);
@@ -146,6 +153,7 @@ perl_call_chain_filter (const char *function, struct worker_task *task)
PUTBACK;
FREETMPS;
+ av_undef (av);
LEAVE;
@@ -170,6 +178,9 @@ void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, voi
PUTBACK;
call_sv (callback_data->callback, G_SCALAR);
+
+ free (callback_data);
+ free (ctx);
SPAGAIN;
FREETMPS;
diff --git a/perl.h b/perl.h
index fc2ae9321..9a37634e3 100644
--- a/perl.h
+++ b/perl.h
@@ -12,7 +12,7 @@ int perl_call_header_filter (const char *function, struct worker_task *task);
int perl_call_mime_filter (const char *function, struct worker_task *task);
int perl_call_message_filter (const char *function, struct worker_task *task);
int perl_call_url_filter (const char *function, struct worker_task *task);
-int perl_call_chain_filter (const char *function, struct worker_task *task);
+int perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number);
void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data);
diff --git a/perl/rspamd.xs b/perl/rspamd.xs
index 681e40308..4ec40f0fb 100644
--- a/perl/rspamd.xs
+++ b/perl/rspamd.xs
@@ -111,6 +111,7 @@ read_memcached_key (r, key, datalen, callback)
SV *callback;
struct worker_task *task;
} *callback_data;
+ memcached_ctx_t *ctx;
memcached_param_t param;
perl_set_session (r);
@@ -118,16 +119,21 @@ read_memcached_key (r, key, datalen, callback)
datalen = (unsigned int) SvIV (ST(2));
callback = SvRV(ST(3));
- r->memc_ctx->callback = perl_call_memcached_callback;
+ /* Copy old ctx to new one */
+ ctx = malloc (sizeof (memcached_ctx_t));
+ if (ctx == NULL) {
+ XSRETURN_UNDEF;
+ }
+ memcpy (ctx, r->memc_ctx, sizeof (memcached_ctx_t));
+ /* Set perl callback */
+ ctx->callback = perl_call_memcached_callback;
callback_data = malloc (sizeof (struct _param));
if (callback_data == NULL) {
XSRETURN_UNDEF;
}
callback_data->callback = callback;
callback_data->task = r;
- r->memc_ctx->callback_data = (void *)callback_data;
-
- r->memc_busy = 1;
+ ctx->callback_data = (void *)callback_data;
strlcpy (param.key, key, sizeof (param.key));
param.buf = malloc (datalen);
@@ -137,7 +143,7 @@ read_memcached_key (r, key, datalen, callback)
param.bufpos = 0;
param.expire = 0;
- memc_get (r->memc_ctx, &param);
+ memc_get (ctx, &param);
XSRETURN_EMPTY;
void
@@ -152,6 +158,7 @@ write_memcached_key (r, key, data, expire, callback)
SV *callback;
struct worker_task *task;
} *callback_data;
+ memcached_ctx_t *ctx;
memcached_param_t param;
perl_set_session (r);
@@ -160,16 +167,21 @@ write_memcached_key (r, key, data, expire, callback)
expire = (int) SvIV (ST(3));
callback = SvRV(ST(4));
- r->memc_ctx->callback = perl_call_memcached_callback;
+ /* Copy old ctx to new one */
+ ctx = malloc (sizeof (memcached_ctx_t));
+ if (ctx == NULL) {
+ XSRETURN_UNDEF;
+ }
+ memcpy (ctx, r->memc_ctx, sizeof (memcached_ctx_t));
+ /* Set perl callback */
+ ctx->callback = perl_call_memcached_callback;
callback_data = malloc (sizeof (struct _param));
if (callback_data == NULL) {
XSRETURN_UNDEF;
}
callback_data->callback = callback;
callback_data->task = r;
- r->memc_ctx->callback_data = (void *)callback_data;
-
- r->memc_busy = 1;
+ ctx->callback_data = (void *)callback_data;
strlcpy (param.key, key, sizeof (param.key));
param.buf = data;
@@ -177,7 +189,7 @@ write_memcached_key (r, key, data, expire, callback)
param.bufpos = 0;
param.expire = expire;
- memc_set (r->memc_ctx, &param, expire);
+ memc_set (ctx, &param, expire);
XSRETURN_EMPTY;
void
@@ -191,22 +203,28 @@ delete_memcached_key (r, key, callback)
SV *callback;
struct worker_task *task;
} *callback_data;
+ memcached_ctx_t *ctx;
memcached_param_t param;
perl_set_session (r);
key = (char *) SvPV (ST(1), keylen);
callback = SvRV(ST(2));
- r->memc_ctx->callback = perl_call_memcached_callback;
+ /* Copy old ctx to new one */
+ ctx = malloc (sizeof (memcached_ctx_t));
+ if (ctx == NULL) {
+ XSRETURN_UNDEF;
+ }
+ memcpy (ctx, r->memc_ctx, sizeof (memcached_ctx_t));
+ /* Set perl callback */
+ ctx->callback = perl_call_memcached_callback;
callback_data = malloc (sizeof (struct _param));
if (callback_data == NULL) {
XSRETURN_UNDEF;
}
callback_data->callback = callback;
callback_data->task = r;
- r->memc_ctx->callback_data = (void *)callback_data;
-
- r->memc_busy = 1;
+ ctx->callback_data = (void *)callback_data;
strlcpy (param.key, key, sizeof (param.key));
param.buf = NULL;
@@ -214,6 +232,6 @@ delete_memcached_key (r, key, callback)
param.bufpos = 0;
param.expire = 0;
- memc_delete (r->memc_ctx, &param);
+ memc_delete (ctx, &param);
XSRETURN_EMPTY;
diff --git a/worker.c b/worker.c
index d45284392..84c9ef763 100644
--- a/worker.c
+++ b/worker.c
@@ -73,6 +73,7 @@ free_task (struct worker_task *task)
{
struct uri *cur;
struct filter_result *res;
+ struct chain_result *chain_res;
struct mime_part *part;
if (task) {
@@ -104,9 +105,18 @@ free_task (struct worker_task *task)
}
while (!TAILQ_EMPTY (&task->results)) {
res = TAILQ_FIRST (&task->results);
- free (res);
TAILQ_REMOVE (&task->results, res, next);
+ free (res);
+ }
+ while (!TAILQ_EMPTY (&task->chain_results)) {
+ chain_res = TAILQ_FIRST (&task->chain_results);
+ if (chain_res->marks != NULL) {
+ free (chain_res->marks);
+ }
+ TAILQ_REMOVE (&task->chain_results, chain_res, next);
+ free (chain_res);
}
+
while (!TAILQ_EMPTY (&task->parts)) {
part = TAILQ_FIRST (&task->parts);
g_object_unref (part->type);
@@ -123,7 +133,7 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data)
{
struct worker_task *task = (struct worker_task *)user_data;
struct mime_part *mime_part;
- const GMimeContentType *type;
+ GMimeContentType *type;
GMimeDataWrapper *wrapper;
GMimeStream *part_stream;
GByteArray *part_content;
@@ -165,7 +175,7 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data)
part_stream = g_mime_stream_mem_new ();
if (g_mime_data_wrapper_write_to_stream (wrapper, part_stream) != -1) {
part_content = g_mime_stream_mem_get_byte_array (GMIME_STREAM_MEM (part_stream));
- type = g_mime_part_get_content_type (GMIME_PART (part));
+ type = (GMimeContentType *)g_mime_part_get_content_type (GMIME_PART (part));
mime_part = g_malloc (sizeof (struct mime_part));
mime_part->type = type;
mime_part->content = part_content;
@@ -183,6 +193,111 @@ mime_foreach_callback (GMimeObject *part, gpointer user_data)
}
}
+static void
+process_filters (struct worker_task *task)
+{
+ struct filter_result *res = NULL;
+ struct chain_result *chain_res = NULL;
+ struct c_module *c_filter;
+ struct filter_chain *chain;
+ struct script_param *perl_script;
+ int i = 0;
+
+ /* First process C modules */
+ LIST_FOREACH (c_filter, &task->cfg->c_modules, next) {
+ res = malloc (sizeof (struct filter_result));
+ if (res == NULL) {
+ msg_err ("process_filters: malloc failed, %m");
+ return;
+ }
+ res->chain = NULL;
+ res->symbol = c_filter->name;
+ res->mark = 0;
+ if (c_filter->ctx->header_filter != NULL) {
+ res->mark += c_filter->ctx->header_filter (task);
+ }
+ if (c_filter->ctx->message_filter != NULL) {
+ res->mark += c_filter->ctx->message_filter (task);
+ }
+ if (c_filter->ctx->mime_filter != NULL) {
+ res->mark += c_filter->ctx->mime_filter (task);
+ }
+ if (c_filter->ctx->url_filter != NULL) {
+ res->mark += c_filter->ctx->url_filter (task);
+ }
+ TAILQ_INSERT_TAIL (&task->results, res, next);
+ }
+
+ /* Process perl chains */
+ LIST_FOREACH (chain, &task->cfg->filters, next) {
+ chain_res = malloc (sizeof (struct chain_result));
+ if (chain_res == NULL) {
+ msg_err ("process_filters: malloc failed, %m");
+ return;
+ }
+ i = 0;
+ chain_res->chain = chain;
+ chain_res->marks = malloc (sizeof (int) * chain->scripts_number);
+ chain_res->result_mark = 0;
+ if (chain_res->marks == NULL) {
+ free (chain_res);
+ msg_err ("process_filters: malloc failed, %m");
+ return;
+ }
+ LIST_FOREACH (perl_script, chain->scripts, next) {
+ if (perl_script->type == SCRIPT_CHAIN) {
+ /* Skip chain filters first */
+ continue;
+ }
+ res = malloc (sizeof (struct filter_result));
+ if (res == NULL) {
+ msg_err ("process_filters: malloc failed, %m");
+ return;
+ }
+ res->chain = chain;
+ res->symbol = perl_script->symbol;
+ res->mark = 0;
+ switch (perl_script->type) {
+ case SCRIPT_HEADER:
+ res->mark += perl_call_header_filter (perl_script->function, task);
+ break;
+ case SCRIPT_MESSAGE:
+ res->mark += perl_call_message_filter (perl_script->function, task);
+ break;
+ case SCRIPT_MIME:
+ res->mark += perl_call_mime_filter (perl_script->function, task);
+ break;
+ case SCRIPT_URL:
+ res->mark += perl_call_url_filter (perl_script->function, task);
+ break;
+ }
+ TAILQ_INSERT_TAIL (&task->results, res, next);
+ chain_res->marks[i++] = res->mark;
+ }
+ chain_res->marks_num = i;
+ TAILQ_INSERT_TAIL (&task->chain_results, chain_res, next);
+ }
+
+ /* Now process chain results */
+ TAILQ_FOREACH (chain_res, &task->chain_results, next) {
+ i = 0;
+ LIST_FOREACH (perl_script, chain_res->chain->scripts, next) {
+ if (perl_script->type != SCRIPT_CHAIN) {
+ /* Skip not chain filters */
+ continue;
+ }
+ /* Increment i; if i would be equal to zero that would mean that this chain has no chain filter script */
+ i ++;
+ chain_res->result_mark += perl_call_url_filter (perl_script->function, task, chain_res->marks, chain_res->marks_num);
+ }
+ /* If chain has no chain filter, just do addition of all marks */
+ if (i == 0) {
+ for (i = 0; i < chain_res->marks_num; i++) {
+ chain_res->result_mark += chain_res->marks[i];
+ }
+ }
+ }
+}
static void
process_message (struct worker_task *task)
@@ -211,6 +326,8 @@ process_message (struct worker_task *task)
g_mime_message_foreach_part (message, mime_foreach_callback, task);
msg_info ("process_message: found %d parts in message", task->parts_count);
+
+ process_filters (task);
}
static void
@@ -405,6 +522,7 @@ accept_socket (int fd, short what, void *arg)
new_task->state = READ_COMMAND;
new_task->content_length = 0;
new_task->parts_count = 0;
+ new_task->cfg = worker->srv->cfg;
TAILQ_INIT (&new_task->urls);
TAILQ_INIT (&new_task->results);
TAILQ_INIT (&new_task->parts);
@@ -427,7 +545,6 @@ void
start_worker (struct rspamd_worker *worker, int listen_sock)
{
struct sigaction signals;
- struct config_file *cfg = worker->srv->cfg;
worker->srv->pid = getpid ();
worker->srv->type = TYPE_WORKER;
event_init ();