* Perl should use separate memcached context for its operations (just do memcpy with the same socket)
TODO: add block mechanics here to avoid memcached connection closing before perl operation is finished
* Change logic of perl chain filter
#ifndef CFG_FILE_H
#define CFG_FILE_H
+#include "config.h"
#include <sys/types.h>
#ifndef OWN_QUEUE_H
#include <sys/queue.h>
#include <glib.h>
#include "upstream.h"
#include "memcached.h"
+#include "main.h"
#define DEFAULT_BIND_PORT 768
#define MAX_MEMCACHED_SERVERS 48
SCRIPT_CHAIN,
};
-struct uri;
-
struct memcached_server {
struct upstream up;
struct in_addr addr;
LIST_ENTRY (perl_module) next;
};
-struct module_ctx {
- int (*header_filter)(const char *header_name, const char *header_value);
- int (*mime_filter)(GByteArray *content);
- int (*message_filter)(GByteArray *content);
- int (*uri_filter)(struct uri *uri);
- int (*chain_filter)(GArray *results);
-};
-
-struct c_module {
- const char *name;
- struct module_ctx *ctx;
- LIST_ENTRY (c_module) next;
-};
-
struct script_param {
char *symbol;
char *function;
struct filter_chain {
unsigned int metric;
+ unsigned int scripts_number;
LIST_HEAD (scriptq, script_param) *scripts;
LIST_ENTRY (filter_chain) next;
};
extern char *yytext;
struct scriptq *cur_scripts;
+unsigned int cur_scripts_num = 0;
%}
cur_chain->metric = $1;
cur_chain->scripts = cur_scripts;
+ cur_chain->scripts_number = cur_scripts_num;
LIST_INSERT_HEAD (&cfg->filters, cur_chain, next);
}
YYERROR;
}
LIST_INSERT_HEAD (cur_scripts, $1, next);
+ cur_scripts_num = 1;
}
| filter_chain filter_param SEMICOLON {
if ($2 == NULL) {
YYERROR;
}
LIST_INSERT_HEAD (cur_scripts, $2, next);
+ cur_scripts_num ++;
}
;
struct config_file;
struct filter_chain;
+
/* Struct that determine main server object (for logging purposes) */
struct rspamd_main {
struct config_file *cfg;
TAILQ_ENTRY (filter_result) next;
};
+struct chain_result {
+ struct filter_chain *chain;
+ int *marks;
+ unsigned int marks_num;
+ int result_mark;
+ TAILQ_ENTRY (chain_result) next;
+};
+
struct mime_part {
GMimeContentType *type;
GByteArray *content;
TAILQ_HEAD (uriq, uri) urls;
/* List of filter results */
TAILQ_HEAD (resultsq, filter_result) results;
+ /* Results of all chains */
+ TAILQ_HEAD (chainsq, chain_result) chain_results;
+ struct config_file *cfg;
+};
+
+struct module_ctx {
+ int (*header_filter)(struct worker_task *task);
+ int (*mime_filter)(struct worker_task *task);
+ int (*message_filter)(struct worker_task *task);
+ int (*url_filter)(struct worker_task *task);
+};
+
+struct c_module {
+ const char *name;
+ struct module_ctx *ctx;
+ LIST_ENTRY (c_module) next;
};
void start_worker (struct rspamd_worker *worker, int listen_sock);
}
int
-perl_call_chain_filter (const char *function, struct worker_task *task)
+perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number)
{
- int result;
+ int result, i;
+ AV *av;
dSP;
-
+
ENTER;
SAVETMPS;
+ av = newAV();
+ av_extend (av, number);
+ for (i = 0; i < number; i ++) {
+ av_push (av, sv_2mortal (newSViv (marks[i])));
+ }
PUSHMARK (SP);
XPUSHs (sv_2mortal (newSViv (PTR2IV (task))));
+ XPUSHs (AvARRAY (av));
PUTBACK;
call_pv (function, G_SCALAR);
PUTBACK;
FREETMPS;
+ av_undef (av);
LEAVE;
PUTBACK;
call_sv (callback_data->callback, G_SCALAR);
+
+ free (callback_data);
+ free (ctx);
SPAGAIN;
FREETMPS;
int perl_call_mime_filter (const char *function, struct worker_task *task);
int perl_call_message_filter (const char *function, struct worker_task *task);
int perl_call_url_filter (const char *function, struct worker_task *task);
-int perl_call_chain_filter (const char *function, struct worker_task *task);
+int perl_call_chain_filter (const char *function, struct worker_task *task, int *marks, unsigned int number);
void perl_call_memcached_callback (memcached_ctx_t *ctx, memc_error_t error, void *data);
SV *callback;
struct worker_task *task;
} *callback_data;
+ memcached_ctx_t *ctx;
memcached_param_t param;
perl_set_session (r);
datalen = (unsigned int) SvIV (ST(2));
callback = SvRV(ST(3));
- r->memc_ctx->callback = perl_call_memcached_callback;
+ /* Copy old ctx to new one */
+ ctx = malloc (sizeof (memcached_ctx_t));
+ if (ctx == NULL) {
+ XSRETURN_UNDEF;
+ }
+ memcpy (ctx, r->memc_ctx, sizeof (memcached_ctx_t));
+ /* Set perl callback */
+ ctx->callback = perl_call_memcached_callback;
callback_data = malloc (sizeof (struct _param));
if (callback_data == NULL) {
XSRETURN_UNDEF;
}
callback_data->callback = callback;
callback_data->task = r;
- r->memc_ctx->callback_data = (void *)callback_data;
-
- r->memc_busy = 1;
+ ctx->callback_data = (void *)callback_data;
strlcpy (param.key, key, sizeof (param.key));
param.buf = malloc (datalen);
param.bufpos = 0;
param.expire = 0;
- memc_get (r->memc_ctx, ¶m);
+ memc_get (ctx, ¶m);
XSRETURN_EMPTY;
void
SV *callback;
struct worker_task *task;
} *callback_data;
+ memcached_ctx_t *ctx;
memcached_param_t param;
perl_set_session (r);
expire = (int) SvIV (ST(3));
callback = SvRV(ST(4));
- r->memc_ctx->callback = perl_call_memcached_callback;
+ /* Copy old ctx to new one */
+ ctx = malloc (sizeof (memcached_ctx_t));
+ if (ctx == NULL) {
+ XSRETURN_UNDEF;
+ }
+ memcpy (ctx, r->memc_ctx, sizeof (memcached_ctx_t));
+ /* Set perl callback */
+ ctx->callback = perl_call_memcached_callback;
callback_data = malloc (sizeof (struct _param));
if (callback_data == NULL) {
XSRETURN_UNDEF;
}
callback_data->callback = callback;
callback_data->task = r;
- r->memc_ctx->callback_data = (void *)callback_data;
-
- r->memc_busy = 1;
+ ctx->callback_data = (void *)callback_data;
strlcpy (param.key, key, sizeof (param.key));
param.buf = data;
param.bufpos = 0;
param.expire = expire;
- memc_set (r->memc_ctx, ¶m, expire);
+ memc_set (ctx, ¶m, expire);
XSRETURN_EMPTY;
void
SV *callback;
struct worker_task *task;
} *callback_data;
+ memcached_ctx_t *ctx;
memcached_param_t param;
perl_set_session (r);
key = (char *) SvPV (ST(1), keylen);
callback = SvRV(ST(2));
- r->memc_ctx->callback = perl_call_memcached_callback;
+ /* Copy old ctx to new one */
+ ctx = malloc (sizeof (memcached_ctx_t));
+ if (ctx == NULL) {
+ XSRETURN_UNDEF;
+ }
+ memcpy (ctx, r->memc_ctx, sizeof (memcached_ctx_t));
+ /* Set perl callback */
+ ctx->callback = perl_call_memcached_callback;
callback_data = malloc (sizeof (struct _param));
if (callback_data == NULL) {
XSRETURN_UNDEF;
}
callback_data->callback = callback;
callback_data->task = r;
- r->memc_ctx->callback_data = (void *)callback_data;
-
- r->memc_busy = 1;
+ ctx->callback_data = (void *)callback_data;
strlcpy (param.key, key, sizeof (param.key));
param.buf = NULL;
param.bufpos = 0;
param.expire = 0;
- memc_delete (r->memc_ctx, ¶m);
+ memc_delete (ctx, ¶m);
XSRETURN_EMPTY;
{
struct uri *cur;
struct filter_result *res;
+ struct chain_result *chain_res;
struct mime_part *part;
if (task) {
}
while (!TAILQ_EMPTY (&task->results)) {
res = TAILQ_FIRST (&task->results);
- free (res);
TAILQ_REMOVE (&task->results, res, next);
+ free (res);
+ }
+ while (!TAILQ_EMPTY (&task->chain_results)) {
+ chain_res = TAILQ_FIRST (&task->chain_results);
+ if (chain_res->marks != NULL) {
+ free (chain_res->marks);
+ }
+ TAILQ_REMOVE (&task->chain_results, chain_res, next);
+ free (chain_res);
}
+
while (!TAILQ_EMPTY (&task->parts)) {
part = TAILQ_FIRST (&task->parts);
g_object_unref (part->type);
{
struct worker_task *task = (struct worker_task *)user_data;
struct mime_part *mime_part;
- const GMimeContentType *type;
+ GMimeContentType *type;
GMimeDataWrapper *wrapper;
GMimeStream *part_stream;
GByteArray *part_content;
part_stream = g_mime_stream_mem_new ();
if (g_mime_data_wrapper_write_to_stream (wrapper, part_stream) != -1) {
part_content = g_mime_stream_mem_get_byte_array (GMIME_STREAM_MEM (part_stream));
- type = g_mime_part_get_content_type (GMIME_PART (part));
+ type = (GMimeContentType *)g_mime_part_get_content_type (GMIME_PART (part));
mime_part = g_malloc (sizeof (struct mime_part));
mime_part->type = type;
mime_part->content = part_content;
}
}
+static void
+process_filters (struct worker_task *task)
+{
+ struct filter_result *res = NULL;
+ struct chain_result *chain_res = NULL;
+ struct c_module *c_filter;
+ struct filter_chain *chain;
+ struct script_param *perl_script;
+ int i = 0;
+
+ /* First process C modules */
+ LIST_FOREACH (c_filter, &task->cfg->c_modules, next) {
+ res = malloc (sizeof (struct filter_result));
+ if (res == NULL) {
+ msg_err ("process_filters: malloc failed, %m");
+ return;
+ }
+ res->chain = NULL;
+ res->symbol = c_filter->name;
+ res->mark = 0;
+ if (c_filter->ctx->header_filter != NULL) {
+ res->mark += c_filter->ctx->header_filter (task);
+ }
+ if (c_filter->ctx->message_filter != NULL) {
+ res->mark += c_filter->ctx->message_filter (task);
+ }
+ if (c_filter->ctx->mime_filter != NULL) {
+ res->mark += c_filter->ctx->mime_filter (task);
+ }
+ if (c_filter->ctx->url_filter != NULL) {
+ res->mark += c_filter->ctx->url_filter (task);
+ }
+ TAILQ_INSERT_TAIL (&task->results, res, next);
+ }
+
+ /* Process perl chains */
+ LIST_FOREACH (chain, &task->cfg->filters, next) {
+ chain_res = malloc (sizeof (struct chain_result));
+ if (chain_res == NULL) {
+ msg_err ("process_filters: malloc failed, %m");
+ return;
+ }
+ i = 0;
+ chain_res->chain = chain;
+ chain_res->marks = malloc (sizeof (int) * chain->scripts_number);
+ chain_res->result_mark = 0;
+ if (chain_res->marks == NULL) {
+ free (chain_res);
+ msg_err ("process_filters: malloc failed, %m");
+ return;
+ }
+ LIST_FOREACH (perl_script, chain->scripts, next) {
+ if (perl_script->type == SCRIPT_CHAIN) {
+ /* Skip chain filters first */
+ continue;
+ }
+ res = malloc (sizeof (struct filter_result));
+ if (res == NULL) {
+ msg_err ("process_filters: malloc failed, %m");
+ return;
+ }
+ res->chain = chain;
+ res->symbol = perl_script->symbol;
+ res->mark = 0;
+ switch (perl_script->type) {
+ case SCRIPT_HEADER:
+ res->mark += perl_call_header_filter (perl_script->function, task);
+ break;
+ case SCRIPT_MESSAGE:
+ res->mark += perl_call_message_filter (perl_script->function, task);
+ break;
+ case SCRIPT_MIME:
+ res->mark += perl_call_mime_filter (perl_script->function, task);
+ break;
+ case SCRIPT_URL:
+ res->mark += perl_call_url_filter (perl_script->function, task);
+ break;
+ }
+ TAILQ_INSERT_TAIL (&task->results, res, next);
+ chain_res->marks[i++] = res->mark;
+ }
+ chain_res->marks_num = i;
+ TAILQ_INSERT_TAIL (&task->chain_results, chain_res, next);
+ }
+
+ /* Now process chain results */
+ TAILQ_FOREACH (chain_res, &task->chain_results, next) {
+ i = 0;
+ LIST_FOREACH (perl_script, chain_res->chain->scripts, next) {
+ if (perl_script->type != SCRIPT_CHAIN) {
+ /* Skip not chain filters */
+ continue;
+ }
+ /* Increment i; if i would be equal to zero that would mean that this chain has no chain filter script */
+ i ++;
+ chain_res->result_mark += perl_call_url_filter (perl_script->function, task, chain_res->marks, chain_res->marks_num);
+ }
+ /* If chain has no chain filter, just do addition of all marks */
+ if (i == 0) {
+ for (i = 0; i < chain_res->marks_num; i++) {
+ chain_res->result_mark += chain_res->marks[i];
+ }
+ }
+ }
+}
static void
process_message (struct worker_task *task)
g_mime_message_foreach_part (message, mime_foreach_callback, task);
msg_info ("process_message: found %d parts in message", task->parts_count);
+
+ process_filters (task);
}
static void
new_task->state = READ_COMMAND;
new_task->content_length = 0;
new_task->parts_count = 0;
+ new_task->cfg = worker->srv->cfg;
TAILQ_INIT (&new_task->urls);
TAILQ_INIT (&new_task->results);
TAILQ_INIT (&new_task->parts);
start_worker (struct rspamd_worker *worker, int listen_sock)
{
struct sigaction signals;
- struct config_file *cfg = worker->srv->cfg;
worker->srv->pid = getpid ();
worker->srv->type = TYPE_WORKER;
event_init ();