Bladeren bron

* Add support of custom filters in rspamd worker

  - custom filters are dlopened and provides callbacks for user's input processing
  - custom filters can be used to extend rspamd functionality for unusual (non email processing cases)
  - custom filters allows to use rspamd async IO model and process management for performing custom network tasks
tags/0.3.0
cebka@lenovo-laptop 14 jaren geleden
bovenliggende
commit
56f520e21f
5 gewijzigde bestanden met toevoegingen van 233 en 22 verwijderingen
  1. 1
    1
      CMakeLists.txt
  2. 1
    0
      config.h.in
  3. 0
    3
      src/lmtp.c
  4. 0
    1
      src/main.h
  5. 231
    17
      src/worker.c

+ 1
- 1
CMakeLists.txt Bestand weergeven

@@ -93,7 +93,7 @@ IF(NOT LEX_EXECUTABLE OR NOT YACC_EXECUTABLE)
MESSAGE(FATAL_ERROR "Error: yacc and lex are required for build")
ENDIF(NOT LEX_EXECUTABLE OR NOT YACC_EXECUTABLE)

pkg_check_modules(GLIB2 REQUIRED glib-2.0>=2.16)
pkg_check_modules(GLIB2 REQUIRED glib-2.0>=2.16 gmodule-2.0)
pkg_check_modules(GMIME2 gmime-2.0)

# Try to link with gmime24

+ 1
- 0
config.h.in Bestand weergeven

@@ -294,6 +294,7 @@
#include <signal.h>
#include <event.h>
#include <glib.h>
#include <gmodule.h>

#ifndef NO_GMIME
#include <gmime/gmime.h>

+ 0
- 3
src/lmtp.c Bestand weergeven

@@ -98,9 +98,6 @@ free_lmtp_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft)

if (lmtp) {
debug_task ("free pointer %p", lmtp->task);
if (lmtp->task->memc_ctx) {
memc_close_ctx (lmtp->task->memc_ctx);
}
while ((part = g_list_first (lmtp->task->parts))) {
lmtp->task->parts = g_list_remove_link (lmtp->task->parts, part);
p = (struct mime_part *)part->data;

+ 0
- 1
src/main.h Bestand weergeven

@@ -189,7 +189,6 @@ struct worker_task {
f_str_t *msg; /**< message buffer */
rspamd_io_dispatcher_t *dispatcher; /**< IO dispatcher object */
struct rspamd_async_session* s; /**< async session object */
memcached_ctx_t *memc_ctx; /**< memcached context associated with task */
int parts_count; /**< mime parts count */
GMimeMessage *message; /**< message, parsed with GMime */
InternetAddressList *rcpts; /**< list of all recipients */

+ 231
- 17
src/worker.c Bestand weergeven

@@ -50,10 +50,30 @@ extern PerlInterpreter *perl_interpreter;
# include <glib/gprintf.h>
#endif

#define MODULE_INIT_FUNC "module_init"
#define MODULE_FINIT_FUNC "module_fin"
#define MODULE_BEFORE_CONNECT_FUNC "before_connect"
#define MODULE_AFTER_CONNECT_FUNC "after_connect"
#define MODULE_PARSE_LINE_FUNC "parse_line"

struct custom_filter {
char *filename; /*< filename */
GModule *handle; /*< returned by dlopen */
void (*init_func)(void); /*< called at start of worker */
void* (*before_connect)(void); /*< called when clients connects */
gboolean (*process_line)(const char *line, size_t len, char **output, void *user_data); /*< called when client send data line */
void (*after_connect)(char **output, char **log_line, void *user_data); /*< called when client disconnects */
void (*fin_func)(void);
};

static struct timeval io_tv;
/* Detect whether this worker is mime worker */
static gboolean is_mime;

/* Detect whether this worker bypass normal filters and is using custom filters */
static gboolean is_custom;
static GList *custom_filters;

static gboolean write_socket (void *arg);

#ifndef HAVE_SA_SIGINFO
@@ -112,6 +132,65 @@ rcpt_destruct (void *pointer)
}
}

static void
fin_custom_filters (struct worker_task *task)
{
GList *cur, *curd;
struct custom_filter *filt;
char *output, *log;

cur = custom_filters;
curd = task->rcpt;
while (cur) {
filt = cur->data;
if (filt->after_connect) {
filt->after_connect (&output, &log, curd->data);
if (output != NULL) {
rspamd_dispatcher_write (task->dispatcher, output, strlen (output), FALSE, FALSE);
g_free (output);
}
if (log != NULL) {
msg_info ("%s", log);
g_free (log);
}
if (curd->next) {
curd = g_list_next (curd);
}
}
cur = g_list_next (cur);
}
}

static gboolean
parse_line_custom (struct worker_task *task, f_str_t *in)
{
GList *cur, *curd;
struct custom_filter *filt;
char *output;
gboolean res = TRUE;

cur = custom_filters;
curd = task->rcpt;
while (cur) {
filt = cur->data;
if (filt->after_connect) {
if (! filt->process_line (in->begin, in->len, &output, curd->data)) {
res = FALSE;
}
if (output != NULL) {
rspamd_dispatcher_write (task->dispatcher, output, strlen (output), FALSE, FALSE);
g_free (output);
}
if (curd->next) {
curd = g_list_next (curd);
}
}
cur = g_list_next (cur);
}

return res;
}

/*
* Free all structures of worker_task
*/
@@ -123,9 +202,6 @@ free_task (struct worker_task *task, gboolean is_soft)

if (task) {
debug_task ("free pointer %p", task);
if (task->memc_ctx) {
memc_close_ctx (task->memc_ctx);
}
while ((part = g_list_first (task->parts))) {
task->parts = g_list_remove_link (task->parts, part);
p = (struct mime_part *)part->data;
@@ -175,10 +251,19 @@ read_socket (f_str_t * in, void *arg)
switch (task->state) {
case READ_COMMAND:
case READ_HEADER:
if (read_rspamd_input_line (task, in) != 0) {
task->last_error = "Read error";
task->error_code = RSPAMD_NETWORK_ERROR;
task->state = WRITE_ERROR;
if (is_custom) {
if (! parse_line_custom (task, in)) {
task->last_error = "Read error";
task->error_code = RSPAMD_NETWORK_ERROR;
task->state = WRITE_ERROR;
}
}
else {
if (read_rspamd_input_line (task, in) != 0) {
task->last_error = "Read error";
task->error_code = RSPAMD_NETWORK_ERROR;
task->state = WRITE_ERROR;
}
}
if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) {
return write_socket (task);
@@ -238,21 +323,33 @@ write_socket (void *arg)
case WRITE_REPLY:
write_reply (task);
destroy_session (task->s);
if (is_custom) {
fin_custom_filters (task);
}
return FALSE;
break;
case WRITE_ERROR:
write_reply (task);
destroy_session (task->s);
if (is_custom) {
fin_custom_filters (task);
}
return FALSE;
break;
case CLOSING_CONNECTION:
debug_task ("normally closing connection");
destroy_session (task->s);
if (is_custom) {
fin_custom_filters (task);
}
return FALSE;
break;
default:
msg_info ("abnormally closing connection");
destroy_session (task->s);
if (is_custom) {
fin_custom_filters (task);
}
return FALSE;
break;
}
@@ -269,6 +366,9 @@ err_socket (GError * err, void *arg)
msg_info ("abnormally closing connection, error: %s", err->message);
/* Free buffers */
destroy_session (task->s);
if (is_custom) {
fin_custom_filters (task);
}
}

struct worker_task *
@@ -294,6 +394,7 @@ construct_task (struct rspamd_worker *worker)
io_tv.tv_sec = WORKER_IO_TIMEOUT;
io_tv.tv_usec = 0;
new_task->task_pool = memory_pool_new (memory_pool_get_size ());

/* Add destructor for recipients list (it would be better to use anonymous function here */
memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) rcpt_destruct, new_task);
new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
@@ -317,6 +418,9 @@ accept_socket (int fd, short what, void *arg)
struct sockaddr_storage ss;
struct sockaddr_in *sin;
struct worker_task *new_task;
GList *cur;
struct custom_filter *filt;

socklen_t addrlen = sizeof (ss);
int nfd;

@@ -328,8 +432,7 @@ accept_socket (int fd, short what, void *arg)
if (nfd == 0) {
return;
}


new_task = construct_task (worker);

if (ss.ss_family == AF_UNIX) {
@@ -349,7 +452,106 @@ accept_socket (int fd, short what, void *arg)
/* Set up dispatcher */
new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &io_tv, (void *)new_task);
new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
/* Init custom filters */
if (is_custom) {
cur = custom_filters;
while (cur) {
filt = cur->data;
if (filt->before_connect) {
/* XXX: maybe not use rcpt list here for custom filters data, but this can save some bytes in task structure */
new_task->rcpt = g_list_prepend (new_task->rcpt, filt->before_connect ());
}
cur = g_list_next (cur);
}
/* Keep user data in the same order as custom filters */
new_task->rcpt = g_list_reverse (new_task->rcpt);
}
}

static gboolean
load_custom_filter (const char *file)
{
struct custom_filter *filt;
struct stat st;

if (stat (file, &st) == -1 || !S_ISREG (st.st_mode)) {
msg_info ("stat failed for %s", file);
return FALSE;
}

filt = g_malloc (sizeof (struct custom_filter));

filt->handle = g_module_open (file, G_MODULE_BIND_LAZY);
if (!filt->handle) {
msg_info ("module load failed: %s", g_module_error ());
g_free (filt);
return FALSE;
}
/* Now extract functions from custom module */
if (!g_module_symbol (filt->handle, MODULE_INIT_FUNC, (gpointer *)&filt->init_func) ||
!g_module_symbol (filt->handle, MODULE_FINIT_FUNC, (gpointer *)&filt->fin_func) ||
!g_module_symbol (filt->handle, MODULE_BEFORE_CONNECT_FUNC, (gpointer *)&filt->before_connect) ||
!g_module_symbol (filt->handle, MODULE_AFTER_CONNECT_FUNC, (gpointer *)&filt->after_connect) ||
!g_module_symbol (filt->handle, MODULE_PARSE_LINE_FUNC, (gpointer *)&filt->process_line)) {

msg_info ("cannot find handlers in module %s: %s", file, g_module_error ());
g_free (filt);
return FALSE;
}

filt->filename = g_strdup (file);
custom_filters = g_list_prepend (custom_filters, filt);

return TRUE;
}

/*
* Load custom filters from specified path
*/
static gboolean
load_custom_filters (struct rspamd_worker *worker, const char *path)
{
glob_t gp;
int r, i;

gp.gl_offs = 0;
if ((r = glob (path, GLOB_NOSORT, NULL, &gp)) != 0) {
msg_warn ("glob failed: %s, %d", strerror (errno), r);
return FALSE;
}
for (i = 0; i < gp.gl_pathc; i ++) {
if (! load_custom_filter (gp.gl_pathv[i])) {
globfree (&gp);
return FALSE;
}
}

globfree (&gp);

return TRUE;
}

static void
unload_custom_filters (void)
{
GList *cur;
struct custom_filter *filt;

cur = custom_filters;
while (cur) {
filt = cur->data;
if (filt->fin_func) {
filt->fin_func ();
}
g_module_close (filt->handle);
g_free (filt);
cur = g_list_next (cur);
}

g_list_free (custom_filters);
}

/*
@@ -360,6 +562,7 @@ start_worker (struct rspamd_worker *worker)
{
struct sigaction signals;
char *is_mime_str;
char *is_custom_str;

#ifdef WITH_PROFILER
extern void _start (void), etext (void);
@@ -383,22 +586,33 @@ start_worker (struct rspamd_worker *worker)
/* Accept event */
event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
event_add (&worker->bind_ev, NULL);

/* Maps events */
start_map_watch ();
/* Check whether we are mime worker */
is_mime_str = g_hash_table_lookup (worker->cf->params, "mime");
if (is_mime_str != NULL && (g_ascii_strcasecmp (is_mime_str, "no") == 0 || g_ascii_strcasecmp (is_mime_str, "false") == 0)) {
is_mime = FALSE;
/* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */
is_custom_str = g_hash_table_lookup (worker->cf->params, "custom_filters");
if (is_custom_str && g_module_supported () && load_custom_filters (worker, is_custom_str)) {
is_custom = TRUE;
}
else {
is_mime = TRUE;
/* Maps events */
start_map_watch ();
/* Check whether we are mime worker */
is_mime_str = g_hash_table_lookup (worker->cf->params, "mime");
if (is_mime_str != NULL && (g_ascii_strcasecmp (is_mime_str, "no") == 0 || g_ascii_strcasecmp (is_mime_str, "false") == 0)) {
is_mime = FALSE;
}
else {
is_mime = TRUE;
}
}

event_loop (0);
close_log ();
exit (EXIT_SUCCESS);
if (is_custom) {
unload_custom_filters ();
}
}

/*

Laden…
Annuleren
Opslaan