From 56f520e21f7f164bcd2d99bb46b5875b0a398e75 Mon Sep 17 00:00:00 2001 From: "cebka@lenovo-laptop" Date: Fri, 29 Jan 2010 19:18:34 +0300 Subject: * Add support of custom filters in rspamd worker - custom filters are dlopened and provides callbacks for user's input processing - custom filters can be used to extend rspamd functionality for unusual (non email processing cases) - custom filters allows to use rspamd async IO model and process management for performing custom network tasks --- src/worker.c | 248 +++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 231 insertions(+), 17 deletions(-) (limited to 'src/worker.c') diff --git a/src/worker.c b/src/worker.c index 2de848cf0..92c80bc32 100644 --- a/src/worker.c +++ b/src/worker.c @@ -50,10 +50,30 @@ extern PerlInterpreter *perl_interpreter; # include #endif +#define MODULE_INIT_FUNC "module_init" +#define MODULE_FINIT_FUNC "module_fin" +#define MODULE_BEFORE_CONNECT_FUNC "before_connect" +#define MODULE_AFTER_CONNECT_FUNC "after_connect" +#define MODULE_PARSE_LINE_FUNC "parse_line" + +struct custom_filter { + char *filename; /*< filename */ + GModule *handle; /*< returned by dlopen */ + void (*init_func)(void); /*< called at start of worker */ + void* (*before_connect)(void); /*< called when clients connects */ + gboolean (*process_line)(const char *line, size_t len, char **output, void *user_data); /*< called when client send data line */ + void (*after_connect)(char **output, char **log_line, void *user_data); /*< called when client disconnects */ + void (*fin_func)(void); +}; + static struct timeval io_tv; /* Detect whether this worker is mime worker */ static gboolean is_mime; +/* Detect whether this worker bypass normal filters and is using custom filters */ +static gboolean is_custom; +static GList *custom_filters; + static gboolean write_socket (void *arg); #ifndef HAVE_SA_SIGINFO @@ -112,6 +132,65 @@ rcpt_destruct (void *pointer) } } +static void +fin_custom_filters (struct worker_task *task) +{ + GList *cur, *curd; + struct custom_filter *filt; + char *output, *log; + + cur = custom_filters; + curd = task->rcpt; + while (cur) { + filt = cur->data; + if (filt->after_connect) { + filt->after_connect (&output, &log, curd->data); + if (output != NULL) { + rspamd_dispatcher_write (task->dispatcher, output, strlen (output), FALSE, FALSE); + g_free (output); + } + if (log != NULL) { + msg_info ("%s", log); + g_free (log); + } + if (curd->next) { + curd = g_list_next (curd); + } + } + cur = g_list_next (cur); + } +} + +static gboolean +parse_line_custom (struct worker_task *task, f_str_t *in) +{ + GList *cur, *curd; + struct custom_filter *filt; + char *output; + gboolean res = TRUE; + + cur = custom_filters; + curd = task->rcpt; + while (cur) { + filt = cur->data; + if (filt->after_connect) { + if (! filt->process_line (in->begin, in->len, &output, curd->data)) { + res = FALSE; + } + if (output != NULL) { + rspamd_dispatcher_write (task->dispatcher, output, strlen (output), FALSE, FALSE); + g_free (output); + } + if (curd->next) { + curd = g_list_next (curd); + } + } + cur = g_list_next (cur); + } + + return res; +} + /* * Free all structures of worker_task */ @@ -123,9 +202,6 @@ free_task (struct worker_task *task, gboolean is_soft) if (task) { debug_task ("free pointer %p", task); - if (task->memc_ctx) { - memc_close_ctx (task->memc_ctx); - } while ((part = g_list_first (task->parts))) { task->parts = g_list_remove_link (task->parts, part); p = (struct mime_part *)part->data; @@ -175,10 +251,19 @@ read_socket (f_str_t * in, void *arg) switch (task->state) { case READ_COMMAND: case READ_HEADER: - if (read_rspamd_input_line (task, in) != 0) { - task->last_error = "Read error"; - task->error_code = RSPAMD_NETWORK_ERROR; - task->state = WRITE_ERROR; + if (is_custom) { + if (! parse_line_custom (task, in)) { + task->last_error = "Read error"; + task->error_code = RSPAMD_NETWORK_ERROR; + task->state = WRITE_ERROR; + } + } + else { + if (read_rspamd_input_line (task, in) != 0) { + task->last_error = "Read error"; + task->error_code = RSPAMD_NETWORK_ERROR; + task->state = WRITE_ERROR; + } } if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) { return write_socket (task); @@ -238,21 +323,33 @@ write_socket (void *arg) case WRITE_REPLY: write_reply (task); destroy_session (task->s); + if (is_custom) { + fin_custom_filters (task); + } return FALSE; break; case WRITE_ERROR: write_reply (task); destroy_session (task->s); + if (is_custom) { + fin_custom_filters (task); + } return FALSE; break; case CLOSING_CONNECTION: debug_task ("normally closing connection"); destroy_session (task->s); + if (is_custom) { + fin_custom_filters (task); + } return FALSE; break; default: msg_info ("abnormally closing connection"); destroy_session (task->s); + if (is_custom) { + fin_custom_filters (task); + } return FALSE; break; } @@ -269,6 +366,9 @@ err_socket (GError * err, void *arg) msg_info ("abnormally closing connection, error: %s", err->message); /* Free buffers */ destroy_session (task->s); + if (is_custom) { + fin_custom_filters (task); + } } struct worker_task * @@ -294,6 +394,7 @@ construct_task (struct rspamd_worker *worker) io_tv.tv_sec = WORKER_IO_TIMEOUT; io_tv.tv_usec = 0; new_task->task_pool = memory_pool_new (memory_pool_get_size ()); + /* Add destructor for recipients list (it would be better to use anonymous function here */ memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) rcpt_destruct, new_task); new_task->results = g_hash_table_new (g_str_hash, g_str_equal); @@ -317,6 +418,9 @@ accept_socket (int fd, short what, void *arg) struct sockaddr_storage ss; struct sockaddr_in *sin; struct worker_task *new_task; + GList *cur; + struct custom_filter *filt; + socklen_t addrlen = sizeof (ss); int nfd; @@ -328,8 +432,7 @@ accept_socket (int fd, short what, void *arg) if (nfd == 0) { return; } - - + new_task = construct_task (worker); if (ss.ss_family == AF_UNIX) { @@ -349,7 +452,106 @@ accept_socket (int fd, short what, void *arg) /* Set up dispatcher */ new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &io_tv, (void *)new_task); new_task->dispatcher->peer_addr = new_task->client_addr.s_addr; + + /* Init custom filters */ + if (is_custom) { + cur = custom_filters; + while (cur) { + filt = cur->data; + if (filt->before_connect) { + /* XXX: maybe not use rcpt list here for custom filters data, but this can save some bytes in task structure */ + new_task->rcpt = g_list_prepend (new_task->rcpt, filt->before_connect ()); + } + cur = g_list_next (cur); + } + /* Keep user data in the same order as custom filters */ + new_task->rcpt = g_list_reverse (new_task->rcpt); + } +} + +static gboolean +load_custom_filter (const char *file) +{ + struct custom_filter *filt; + struct stat st; + + if (stat (file, &st) == -1 || !S_ISREG (st.st_mode)) { + msg_info ("stat failed for %s", file); + return FALSE; + } + + filt = g_malloc (sizeof (struct custom_filter)); + + filt->handle = g_module_open (file, G_MODULE_BIND_LAZY); + if (!filt->handle) { + msg_info ("module load failed: %s", g_module_error ()); + g_free (filt); + return FALSE; + } + + /* Now extract functions from custom module */ + if (!g_module_symbol (filt->handle, MODULE_INIT_FUNC, (gpointer *)&filt->init_func) || + !g_module_symbol (filt->handle, MODULE_FINIT_FUNC, (gpointer *)&filt->fin_func) || + !g_module_symbol (filt->handle, MODULE_BEFORE_CONNECT_FUNC, (gpointer *)&filt->before_connect) || + !g_module_symbol (filt->handle, MODULE_AFTER_CONNECT_FUNC, (gpointer *)&filt->after_connect) || + !g_module_symbol (filt->handle, MODULE_PARSE_LINE_FUNC, (gpointer *)&filt->process_line)) { + + msg_info ("cannot find handlers in module %s: %s", file, g_module_error ()); + g_free (filt); + return FALSE; + } + filt->filename = g_strdup (file); + custom_filters = g_list_prepend (custom_filters, filt); + + return TRUE; +} + +/* + * Load custom filters from specified path + */ +static gboolean +load_custom_filters (struct rspamd_worker *worker, const char *path) +{ + glob_t gp; + int r, i; + + gp.gl_offs = 0; + if ((r = glob (path, GLOB_NOSORT, NULL, &gp)) != 0) { + msg_warn ("glob failed: %s, %d", strerror (errno), r); + return FALSE; + } + + for (i = 0; i < gp.gl_pathc; i ++) { + if (! load_custom_filter (gp.gl_pathv[i])) { + globfree (&gp); + return FALSE; + } + } + + globfree (&gp); + + return TRUE; +} + +static void +unload_custom_filters (void) +{ + GList *cur; + struct custom_filter *filt; + + cur = custom_filters; + while (cur) { + filt = cur->data; + if (filt->fin_func) { + filt->fin_func (); + } + g_module_close (filt->handle); + g_free (filt); + cur = g_list_next (cur); + } + + g_list_free (custom_filters); } /* @@ -360,6 +562,7 @@ start_worker (struct rspamd_worker *worker) { struct sigaction signals; char *is_mime_str; + char *is_custom_str; #ifdef WITH_PROFILER extern void _start (void), etext (void); @@ -383,22 +586,33 @@ start_worker (struct rspamd_worker *worker) /* Accept event */ event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker); event_add (&worker->bind_ev, NULL); - - /* Maps events */ - start_map_watch (); - /* Check whether we are mime worker */ - is_mime_str = g_hash_table_lookup (worker->cf->params, "mime"); - if (is_mime_str != NULL && (g_ascii_strcasecmp (is_mime_str, "no") == 0 || g_ascii_strcasecmp (is_mime_str, "false") == 0)) { - is_mime = FALSE; + + /* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */ + is_custom_str = g_hash_table_lookup (worker->cf->params, "custom_filters"); + if (is_custom_str && g_module_supported () && load_custom_filters (worker, is_custom_str)) { + is_custom = TRUE; } else { - is_mime = TRUE; + /* Maps events */ + start_map_watch (); + /* Check whether we are mime worker */ + is_mime_str = g_hash_table_lookup (worker->cf->params, "mime"); + if (is_mime_str != NULL && (g_ascii_strcasecmp (is_mime_str, "no") == 0 || g_ascii_strcasecmp (is_mime_str, "false") == 0)) { + is_mime = FALSE; + } + else { + is_mime = TRUE; + } } event_loop (0); close_log (); exit (EXIT_SUCCESS); + + if (is_custom) { + unload_custom_filters (); + } } /* -- cgit v1.2.3