]> source.dussan.org Git - rspamd.git/commitdiff
* Add support of custom filters in rspamd worker
authorcebka@lenovo-laptop <cebka@lenovo-laptop>
Fri, 29 Jan 2010 16:18:34 +0000 (19:18 +0300)
committercebka@lenovo-laptop <cebka@lenovo-laptop>
Fri, 29 Jan 2010 16:18:34 +0000 (19:18 +0300)
  - custom filters are dlopened and provides callbacks for user's input processing
  - custom filters can be used to extend rspamd functionality for unusual (non email processing cases)
  - custom filters allows to use rspamd async IO model and process management for performing custom network tasks

CMakeLists.txt
config.h.in
src/lmtp.c
src/main.h
src/worker.c

index 445716c8010f602e39c275ab342ab8dc2fe54ca8..db420d556083175b0efb6a0a8ab207a7caebeb64 100644 (file)
@@ -93,7 +93,7 @@ IF(NOT LEX_EXECUTABLE OR NOT YACC_EXECUTABLE)
        MESSAGE(FATAL_ERROR "Error: yacc and lex are required for build")
 ENDIF(NOT LEX_EXECUTABLE OR NOT YACC_EXECUTABLE)
 
-pkg_check_modules(GLIB2 REQUIRED glib-2.0>=2.16)
+pkg_check_modules(GLIB2 REQUIRED glib-2.0>=2.16 gmodule-2.0)
 pkg_check_modules(GMIME2 gmime-2.0)
 
 # Try to link with gmime24
index e672c6d8873288dac7d630ff5fb88d85e9a58336..948a986c49aa12a2f3830fae384a8e4b6dd173dc 100644 (file)
 #include <signal.h>
 #include <event.h>
 #include <glib.h>
+#include <gmodule.h>
 
 #ifndef NO_GMIME
 #include <gmime/gmime.h>
index 636681c07052b6a8e9e2de708b1cc7dfddf93d21..9f7a4805af1eeb57168a188db022e560a94bffc7 100644 (file)
@@ -98,9 +98,6 @@ free_lmtp_task (struct rspamd_lmtp_proto *lmtp, gboolean is_soft)
 
        if (lmtp) {
                debug_task ("free pointer %p", lmtp->task);
-               if (lmtp->task->memc_ctx) {
-                       memc_close_ctx (lmtp->task->memc_ctx);
-               }
                while ((part = g_list_first (lmtp->task->parts))) {
                        lmtp->task->parts = g_list_remove_link (lmtp->task->parts, part);
                        p = (struct mime_part *)part->data;
index ed7d2c80c35252a0818c998ff147e489f8a5fe50..a7cab8676b6ee765c3fc28015aa69c8d6253916b 100644 (file)
@@ -189,7 +189,6 @@ struct worker_task {
        f_str_t *msg;                                                                                           /**< message buffer                                                                     */
        rspamd_io_dispatcher_t *dispatcher;                                                     /**< IO dispatcher object                                                       */
     struct rspamd_async_session* s;                                                            /**< async session object                                                       */
-       memcached_ctx_t *memc_ctx;                                                                      /**< memcached context associated with task                     */
        int parts_count;                                                                                        /**< mime parts count                                                           */
        GMimeMessage *message;                                                                          /**< message, parsed with GMime                                         */
        InternetAddressList *rcpts;                                                                     /**< list of all recipients                                             */
index 2de848cf0e462f7201bb96a819d9afc195e5d185..92c80bc32b7f6a933d82398da346f5db58f6437f 100644 (file)
@@ -50,10 +50,30 @@ extern PerlInterpreter         *perl_interpreter;
 #   include <glib/gprintf.h>
 #endif
 
+#define MODULE_INIT_FUNC "module_init"
+#define MODULE_FINIT_FUNC "module_fin"
+#define MODULE_BEFORE_CONNECT_FUNC "before_connect"
+#define MODULE_AFTER_CONNECT_FUNC "after_connect"
+#define MODULE_PARSE_LINE_FUNC "parse_line"
+
+struct custom_filter {
+       char *filename;                                 /*< filename           */
+       GModule *handle;                                /*< returned by dlopen */
+       void (*init_func)(void);                /*< called at start of worker */
+       void* (*before_connect)(void);  /*< called when clients connects */
+       gboolean (*process_line)(const char *line, size_t len, char **output, void *user_data); /*< called when client send data line */
+       void (*after_connect)(char **output, char **log_line, void *user_data); /*< called when client disconnects */
+       void (*fin_func)(void); 
+};
+
 static struct timeval           io_tv;
 /* Detect whether this worker is mime worker */
 static gboolean                 is_mime;
 
+/* Detect whether this worker bypass normal filters and is using custom filters */
+static gboolean                 is_custom;
+static GList                   *custom_filters;
+
 static gboolean                 write_socket (void *arg);
 
 #ifndef HAVE_SA_SIGINFO
@@ -112,6 +132,65 @@ rcpt_destruct (void *pointer)
        }
 }
 
+static void
+fin_custom_filters (struct worker_task *task)
+{
+       GList                          *cur, *curd;
+       struct custom_filter           *filt;
+       char                           *output, *log;
+
+       cur = custom_filters;
+       curd = task->rcpt;
+       while (cur) {
+               filt = cur->data;
+               if (filt->after_connect) {
+                       filt->after_connect (&output, &log, curd->data);
+                       if (output != NULL) {
+                               rspamd_dispatcher_write (task->dispatcher, output, strlen (output), FALSE, FALSE);
+                               g_free (output);
+                       }
+                       if (log != NULL) {
+                               msg_info ("%s", log);
+                               g_free (log);
+                       }
+                       if (curd->next) {
+                               curd = g_list_next (curd);
+                       }
+               }
+               cur = g_list_next (cur);
+       }
+}
+
+static gboolean
+parse_line_custom (struct worker_task *task, f_str_t *in)
+{
+       GList                          *cur, *curd;
+       struct custom_filter           *filt;
+       char                           *output;
+       gboolean                        res = TRUE;
+
+       cur = custom_filters;
+       curd = task->rcpt;
+       while (cur) {
+               filt = cur->data;
+               if (filt->after_connect) {
+                       if (! filt->process_line (in->begin, in->len, &output, curd->data)) {
+                               res = FALSE;
+                       }
+                       if (output != NULL) {
+                               rspamd_dispatcher_write (task->dispatcher, output, strlen (output), FALSE, FALSE);
+                               g_free (output);
+                       }
+                       if (curd->next) {
+                               curd = g_list_next (curd);
+                       }
+               }
+               cur = g_list_next (cur);
+       }
+
+       return res;
+}
+
 /*
  * Free all structures of worker_task
  */
@@ -123,9 +202,6 @@ free_task (struct worker_task *task, gboolean is_soft)
 
        if (task) {
                debug_task ("free pointer %p", task);
-               if (task->memc_ctx) {
-                       memc_close_ctx (task->memc_ctx);
-               }
                while ((part = g_list_first (task->parts))) {
                        task->parts = g_list_remove_link (task->parts, part);
                        p = (struct mime_part *)part->data;
@@ -175,10 +251,19 @@ read_socket (f_str_t * in, void *arg)
        switch (task->state) {
        case READ_COMMAND:
        case READ_HEADER:
-               if (read_rspamd_input_line (task, in) != 0) {
-                       task->last_error = "Read error";
-                       task->error_code = RSPAMD_NETWORK_ERROR;
-                       task->state = WRITE_ERROR;
+               if (is_custom) {
+                       if (! parse_line_custom (task, in)) {
+                               task->last_error = "Read error";
+                               task->error_code = RSPAMD_NETWORK_ERROR;
+                               task->state = WRITE_ERROR;
+                       }       
+               }
+               else {
+                       if (read_rspamd_input_line (task, in) != 0) {
+                               task->last_error = "Read error";
+                               task->error_code = RSPAMD_NETWORK_ERROR;
+                               task->state = WRITE_ERROR;
+                       }
                }
                if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) {
                        return write_socket (task);
@@ -238,21 +323,33 @@ write_socket (void *arg)
        case WRITE_REPLY:
                write_reply (task);
                destroy_session (task->s);
+               if (is_custom) {
+                       fin_custom_filters (task);
+               }
                return FALSE;
                break;
        case WRITE_ERROR:
                write_reply (task);
                destroy_session (task->s);
+               if (is_custom) {
+                       fin_custom_filters (task);
+               }
                return FALSE;
                break;
        case CLOSING_CONNECTION:
                debug_task ("normally closing connection");
                destroy_session (task->s);
+               if (is_custom) {
+                       fin_custom_filters (task);
+               }
                return FALSE;
                break;
        default:
                msg_info ("abnormally closing connection");
                destroy_session (task->s);
+               if (is_custom) {
+                       fin_custom_filters (task);
+               }
                return FALSE;
                break;
        }
@@ -269,6 +366,9 @@ err_socket (GError * err, void *arg)
        msg_info ("abnormally closing connection, error: %s", err->message);
        /* Free buffers */
        destroy_session (task->s);
+       if (is_custom) {
+               fin_custom_filters (task);
+       }
 }
 
 struct worker_task             *
@@ -294,6 +394,7 @@ construct_task (struct rspamd_worker *worker)
        io_tv.tv_sec = WORKER_IO_TIMEOUT;
        io_tv.tv_usec = 0;
        new_task->task_pool = memory_pool_new (memory_pool_get_size ());
+
        /* Add destructor for recipients list (it would be better to use anonymous function here */
        memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func) rcpt_destruct, new_task);
        new_task->results = g_hash_table_new (g_str_hash, g_str_equal);
@@ -317,6 +418,9 @@ accept_socket (int fd, short what, void *arg)
        struct sockaddr_storage         ss;
        struct sockaddr_in             *sin;
        struct worker_task             *new_task;
+       GList                          *cur;
+       struct custom_filter           *filt;
+
        socklen_t                       addrlen = sizeof (ss);
        int                             nfd;
 
@@ -328,8 +432,7 @@ accept_socket (int fd, short what, void *arg)
        if (nfd == 0) {
                return;
        }
-
-
+       
        new_task = construct_task (worker);
 
        if (ss.ss_family == AF_UNIX) {
@@ -349,7 +452,106 @@ accept_socket (int fd, short what, void *arg)
        /* Set up dispatcher */
        new_task->dispatcher = rspamd_create_dispatcher (nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &io_tv, (void *)new_task);
        new_task->dispatcher->peer_addr = new_task->client_addr.s_addr;
+       
+       /* Init custom filters */
+       if (is_custom) {
+               cur = custom_filters;
+               while (cur) {
+                       filt = cur->data;
+                       if (filt->before_connect) {
+                               /* XXX: maybe not use rcpt list here for custom filters data, but this can save some bytes in task structure */
+                               new_task->rcpt = g_list_prepend (new_task->rcpt, filt->before_connect ());
+                       }
+                       cur = g_list_next (cur);
+               }
+               /* Keep user data in the same order as custom filters */
+               new_task->rcpt = g_list_reverse (new_task->rcpt);
+       }
+}
+
+static gboolean
+load_custom_filter (const char *file) 
+{
+       struct custom_filter           *filt;
+       struct stat                     st;
+
+       if (stat (file, &st) == -1 || !S_ISREG (st.st_mode)) {
+               msg_info ("stat failed for %s", file);
+               return FALSE;
+       }
+
+       filt = g_malloc (sizeof (struct custom_filter));
+
+       filt->handle = g_module_open (file, G_MODULE_BIND_LAZY);
+       if (!filt->handle) {
+               msg_info ("module load failed: %s", g_module_error ());
+               g_free (filt);
+               return FALSE;
+       }
+       
+       /* Now extract functions from custom module */
+       if (!g_module_symbol (filt->handle, MODULE_INIT_FUNC, (gpointer *)&filt->init_func) ||
+               !g_module_symbol (filt->handle, MODULE_FINIT_FUNC, (gpointer *)&filt->fin_func) ||
+               !g_module_symbol (filt->handle, MODULE_BEFORE_CONNECT_FUNC, (gpointer *)&filt->before_connect) ||
+               !g_module_symbol (filt->handle, MODULE_AFTER_CONNECT_FUNC, (gpointer *)&filt->after_connect) ||
+               !g_module_symbol (filt->handle, MODULE_PARSE_LINE_FUNC, (gpointer *)&filt->process_line)) {
+
+               msg_info ("cannot find handlers in module %s: %s", file, g_module_error ());
+               g_free (filt);
+               return FALSE;
+       }
 
+       filt->filename = g_strdup (file);
+       custom_filters = g_list_prepend (custom_filters, filt);
+
+       return TRUE;
+}
+
+/*
+ * Load custom filters from specified path
+ */
+static gboolean
+load_custom_filters (struct rspamd_worker *worker, const char *path)
+{
+       glob_t                          gp;
+       int                             r, i;
+
+       gp.gl_offs = 0;
+    if ((r = glob (path, GLOB_NOSORT, NULL, &gp)) != 0) {
+               msg_warn ("glob failed: %s, %d", strerror (errno), r);
+               return FALSE;
+       }
+       
+       for (i = 0; i < gp.gl_pathc; i ++) {
+               if (! load_custom_filter (gp.gl_pathv[i])) {
+                       globfree (&gp);
+                       return FALSE;
+               }
+       }
+
+       globfree (&gp);
+
+       return TRUE;
+}
+
+static void
+unload_custom_filters (void)
+{
+       GList                          *cur;
+       struct custom_filter           *filt;
+
+       cur = custom_filters;
+       while (cur) {
+               filt = cur->data;
+               if (filt->fin_func) {
+                       filt->fin_func ();
+               }
+               g_module_close (filt->handle);
+               g_free (filt);
+               cur = g_list_next (cur);
+       }
+
+       g_list_free (custom_filters);
 }
 
 /*
@@ -360,6 +562,7 @@ start_worker (struct rspamd_worker *worker)
 {
        struct sigaction                signals;
        char                           *is_mime_str;
+       char                           *is_custom_str;
 
 #ifdef WITH_PROFILER
        extern void                     _start (void), etext (void);
@@ -383,22 +586,33 @@ start_worker (struct rspamd_worker *worker)
        /* Accept event */
        event_set (&worker->bind_ev, worker->cf->listen_sock, EV_READ | EV_PERSIST, accept_socket, (void *)worker);
        event_add (&worker->bind_ev, NULL);
-
-       /* Maps events */
-       start_map_watch ();
-       /* Check whether we are mime worker */
-       is_mime_str = g_hash_table_lookup (worker->cf->params, "mime");
-       if (is_mime_str != NULL && (g_ascii_strcasecmp (is_mime_str, "no") == 0 || g_ascii_strcasecmp (is_mime_str, "false") == 0)) {
-               is_mime = FALSE;
+       
+       /* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */
+       is_custom_str = g_hash_table_lookup (worker->cf->params, "custom_filters");
+       if (is_custom_str && g_module_supported () && load_custom_filters (worker, is_custom_str)) {
+               is_custom = TRUE;
        }
        else {
-               is_mime = TRUE;
+               /* Maps events */
+               start_map_watch ();
+               /* Check whether we are mime worker */
+               is_mime_str = g_hash_table_lookup (worker->cf->params, "mime");
+               if (is_mime_str != NULL && (g_ascii_strcasecmp (is_mime_str, "no") == 0 || g_ascii_strcasecmp (is_mime_str, "false") == 0)) {
+                       is_mime = FALSE;
+               }
+               else {
+                       is_mime = TRUE;
+               }
        }
 
        event_loop (0);
        
        close_log ();
        exit (EXIT_SUCCESS);
+       
+       if (is_custom) {
+               unload_custom_filters ();
+       }
 }
 
 /*