diff options
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 277 |
1 files changed, 4 insertions, 273 deletions
diff --git a/src/worker.c b/src/worker.c index 2149639de..bb43afba8 100644 --- a/src/worker.c +++ b/src/worker.c @@ -61,26 +61,6 @@ worker_t normal_worker = { SOCK_STREAM /* TCP socket */ }; -#ifndef BUILD_STATIC - -#define MODULE_INIT_FUNC "module_init" -#define MODULE_FINIT_FUNC "module_fin" -#define MODULE_BEFORE_CONNECT_FUNC "before_connect" -#define MODULE_AFTER_CONNECT_FUNC "after_connect" -#define MODULE_PARSE_LINE_FUNC "parse_line" - -struct custom_filter { - gchar *filename; /*< filename */ - GModule *handle; /*< returned by dlopen */ - void (*init_func) (struct config_file * cfg); /*< called at start of worker */ - void *(*before_connect) (void); /*< called when clients connects */ - gboolean (*process_line) (const gchar * line, size_t len, gchar ** output, void *user_data); /*< called when client send data line */ - void (*after_connect) (gchar ** output, gchar ** log_line, void *user_data); /*< called when client disconnects */ - void (*fin_func) (void); -}; - -#endif - /* * Worker's context */ @@ -89,15 +69,12 @@ struct rspamd_worker_ctx { struct timeval io_tv; /* Detect whether this worker is mime worker */ gboolean is_mime; - /* Detect whether this worker is custom worker */ - gboolean is_custom; /* HTTP worker */ gboolean is_http; /* JSON output */ gboolean is_json; /* Allow learning throught worker */ gboolean allow_learn; - GList *custom_filters; /* DNS resolver */ struct rspamd_dns_resolver *resolver; /* Current tasks */ @@ -178,90 +155,6 @@ sigusr1_handler (gint fd, short what, void *arg) return; } -#ifndef BUILD_STATIC -static void -fin_custom_filters (struct worker_task *task) -{ - GList *cur, *curd; - struct custom_filter *filt; - gchar *output = NULL, *log = NULL; - struct rspamd_worker_ctx *ctx = task->worker->ctx; - - cur = ctx->custom_filters; - curd = task->rcpt; - while (cur) { - filt = cur->data; - if (filt->after_connect) { - filt->after_connect (&output, &log, curd->data); - if (output != NULL) { - if (!rspamd_dispatcher_write - (task->dispatcher, output, strlen (output), FALSE, FALSE)){ - g_free (output); - return; - } - g_free (output); - } - if (log != NULL) { - msg_info ("%s", log); - g_free (log); - } - if (curd->next) { - curd = g_list_next (curd); - } - } - cur = g_list_next (cur); - } -} - -static gboolean -parse_line_custom (struct worker_task *task, f_str_t * in) -{ - GList *cur, *curd; - struct custom_filter *filt; - gchar *output = NULL; - gboolean res = TRUE; - struct rspamd_worker_ctx *ctx = task->worker->ctx; - - cur = ctx->custom_filters; - curd = task->rcpt; - while (cur) { - filt = cur->data; - if (filt->after_connect) { - if (!filt->process_line (in->begin, in->len, &output, curd->data)) { - res = FALSE; - } - if (output != NULL) { - if (!rspamd_dispatcher_write - (task->dispatcher, output, strlen (output), FALSE, FALSE)) { - g_free (output); - return FALSE; - } - g_free (output); - } - if (curd->next) { - curd = g_list_next (curd); - } - } - cur = g_list_next (cur); - } - - return res; -} -#else -/* Stubs */ -static void -fin_custom_filters (struct worker_task *task) -{ - -} - -static gboolean -parse_line_custom (struct worker_task *task, f_str_t * in) -{ - return FALSE; -} -#endif - /* * Callback that is called when there is data to read in buffer @@ -278,21 +171,12 @@ read_socket (f_str_t * in, void *arg) switch (task->state) { case READ_COMMAND: case READ_HEADER: - if (ctx->is_custom) { - if (!parse_line_custom (task, in)) { + if (!read_rspamd_input_line (task, in)) { + if (!task->last_error) { task->last_error = "Read error"; task->error_code = RSPAMD_NETWORK_ERROR; - task->state = WRITE_ERROR; - } - } - else { - if (!read_rspamd_input_line (task, in)) { - if (!task->last_error) { - task->last_error = "Read error"; - task->error_code = RSPAMD_NETWORK_ERROR; - } - task->state = WRITE_ERROR; } + task->state = WRITE_ERROR; } if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) { return write_socket (task); @@ -439,9 +323,6 @@ write_socket (void *arg) if (!write_reply (task)) { return FALSE; } - if (ctx->is_custom) { - fin_custom_filters (task); - } destroy_session (task->s); return FALSE; break; @@ -450,17 +331,11 @@ write_socket (void *arg) if (!write_reply (task)) { return FALSE; } - if (ctx->is_custom) { - fin_custom_filters (task); - } destroy_session (task->s); return FALSE; break; case CLOSING_CONNECTION: debug_task ("normally closing connection"); - if (ctx->is_custom) { - fin_custom_filters (task); - } destroy_session (task->s); return FALSE; break; @@ -494,9 +369,6 @@ write_socket (void *arg) break; default: msg_info ("abnormally closing connection at state: %d", task->state); - if (ctx->is_custom) { - fin_custom_filters (task); - } destroy_session (task->s); return FALSE; break; @@ -517,9 +389,6 @@ err_socket (GError * err, void *arg) msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message); /* Free buffers */ - if (ctx->is_custom) { - fin_custom_filters (task); - } g_error_free (err); destroy_session (task->s); } @@ -633,8 +502,6 @@ accept_socket (gint fd, short what, void *arg) struct rspamd_worker_ctx *ctx; union sa_union su; struct worker_task *new_task; - GList *cur; - struct custom_filter *filt; socklen_t addrlen = sizeof (su.ss); gint nfd; @@ -692,123 +559,8 @@ accept_socket (gint fd, short what, void *arg) /* Set up async session */ new_task->s = new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task); - - /* Init custom filters */ -#ifndef BUILD_STATIC - if (ctx->is_custom) { - cur = ctx->custom_filters; - while (cur) { - filt = cur->data; - if (filt->before_connect) { - /* XXX: maybe not use rcpt list here for custom filters data, but this can save some bytes in task structure */ - new_task->rcpt = - g_list_prepend (new_task->rcpt, filt->before_connect ()); - } - cur = g_list_next (cur); - } - /* Keep user data in the same order as custom filters */ - new_task->rcpt = g_list_reverse (new_task->rcpt); - } -#endif -} - -#ifndef BUILD_STATIC -static gboolean -load_custom_filter (struct config_file *cfg, const gchar * file, - struct rspamd_worker_ctx *ctx) -{ - struct custom_filter *filt; - struct stat st; - - if (stat (file, &st) == -1 || !S_ISREG (st.st_mode)) { - msg_info ("stat failed for %s", file); - return FALSE; - } - - filt = g_malloc (sizeof (struct custom_filter)); - - filt->handle = g_module_open (file, G_MODULE_BIND_LAZY); - if (!filt->handle) { - msg_info ("module load failed: %s", g_module_error ()); - g_free (filt); - return FALSE; - } - - /* Now extract functions from custom module */ - if (!g_module_symbol - (filt->handle, MODULE_INIT_FUNC, (gpointer *) & filt->init_func) - || !g_module_symbol (filt->handle, MODULE_FINIT_FUNC, - (gpointer *) & filt->fin_func) - || !g_module_symbol (filt->handle, MODULE_BEFORE_CONNECT_FUNC, - (gpointer *) & filt->before_connect) - || !g_module_symbol (filt->handle, MODULE_AFTER_CONNECT_FUNC, - (gpointer *) & filt->after_connect) - || !g_module_symbol (filt->handle, MODULE_PARSE_LINE_FUNC, - (gpointer *) & filt->process_line)) { - - msg_info ("cannot find handlers in module %s: %s", file, - g_module_error ()); - g_free (filt); - return FALSE; - } - - filt->init_func (cfg); - filt->filename = g_strdup (file); - ctx->custom_filters = g_list_prepend (ctx->custom_filters, filt); - - return TRUE; } -/* - * Load custom filters from specified path - */ -static gboolean -load_custom_filters (struct rspamd_worker *worker, const gchar * path) -{ - glob_t gp; - gint r, i; - struct rspamd_worker_ctx *ctx = worker->ctx; - - gp.gl_offs = 0; - if ((r = glob (path, GLOB_NOSORT, NULL, &gp)) != 0) { - msg_warn ("glob failed: %s, %d", strerror (errno), r); - return FALSE; - } - - for (i = 0; i < (gint)gp.gl_pathc; i++) { - if (!load_custom_filter (worker->srv->cfg, gp.gl_pathv[i], ctx)) { - globfree (&gp); - return FALSE; - } - } - - globfree (&gp); - - return TRUE; -} - -static void -unload_custom_filters (struct rspamd_worker_ctx *ctx) -{ - GList *cur; - struct custom_filter *filt; - - cur = ctx->custom_filters; - while (cur) { - filt = cur->data; - if (filt->fin_func) { - filt->fin_func (); - } - g_module_close (filt->handle); - g_free (filt); - cur = g_list_next (cur); - } - - g_list_free (ctx->custom_filters); -} - -#endif - gpointer init_worker (struct config_file *cfg) { @@ -860,7 +612,6 @@ init_worker (struct config_file *cfg) void start_worker (struct rspamd_worker *worker) { - gchar *is_custom_str; struct rspamd_worker_ctx *ctx = worker->ctx; GError *err = NULL; struct lua_locked_state *nL; @@ -877,23 +628,8 @@ start_worker (struct rspamd_worker *worker) event_base_set (ctx->ev_base, &worker->sig_ev_usr1); signal_add (&worker->sig_ev_usr1, NULL); + start_map_watch (worker->srv->cfg, ctx->ev_base); -#ifndef BUILD_STATIC - /* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */ - is_custom_str = g_hash_table_lookup (worker->cf->params, "custom_filters"); - if (is_custom_str && g_module_supported () - && load_custom_filters (worker, is_custom_str)) { - msg_info ("starting custom process, loaded modules from %s", - is_custom_str); - ctx->is_custom = TRUE; - } - else { -#endif - /* Maps events */ - start_map_watch (worker->srv->cfg, ctx->ev_base); -#ifndef BUILD_STATIC - } -#endif ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg); @@ -910,11 +646,6 @@ start_worker (struct rspamd_worker *worker) event_base_loop (ctx->ev_base, 0); -#ifndef BUILD_STATIC - if (ctx->is_custom) { - unload_custom_filters (ctx); - } -#endif close_log (rspamd_main->logger); exit (EXIT_SUCCESS); |