aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c277
1 files changed, 4 insertions, 273 deletions
diff --git a/src/worker.c b/src/worker.c
index 2149639de..bb43afba8 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -61,26 +61,6 @@ worker_t normal_worker = {
SOCK_STREAM /* TCP socket */
};
-#ifndef BUILD_STATIC
-
-#define MODULE_INIT_FUNC "module_init"
-#define MODULE_FINIT_FUNC "module_fin"
-#define MODULE_BEFORE_CONNECT_FUNC "before_connect"
-#define MODULE_AFTER_CONNECT_FUNC "after_connect"
-#define MODULE_PARSE_LINE_FUNC "parse_line"
-
-struct custom_filter {
- gchar *filename; /*< filename */
- GModule *handle; /*< returned by dlopen */
- void (*init_func) (struct config_file * cfg); /*< called at start of worker */
- void *(*before_connect) (void); /*< called when clients connects */
- gboolean (*process_line) (const gchar * line, size_t len, gchar ** output, void *user_data); /*< called when client send data line */
- void (*after_connect) (gchar ** output, gchar ** log_line, void *user_data); /*< called when client disconnects */
- void (*fin_func) (void);
-};
-
-#endif
-
/*
* Worker's context
*/
@@ -89,15 +69,12 @@ struct rspamd_worker_ctx {
struct timeval io_tv;
/* Detect whether this worker is mime worker */
gboolean is_mime;
- /* Detect whether this worker is custom worker */
- gboolean is_custom;
/* HTTP worker */
gboolean is_http;
/* JSON output */
gboolean is_json;
/* Allow learning throught worker */
gboolean allow_learn;
- GList *custom_filters;
/* DNS resolver */
struct rspamd_dns_resolver *resolver;
/* Current tasks */
@@ -178,90 +155,6 @@ sigusr1_handler (gint fd, short what, void *arg)
return;
}
-#ifndef BUILD_STATIC
-static void
-fin_custom_filters (struct worker_task *task)
-{
- GList *cur, *curd;
- struct custom_filter *filt;
- gchar *output = NULL, *log = NULL;
- struct rspamd_worker_ctx *ctx = task->worker->ctx;
-
- cur = ctx->custom_filters;
- curd = task->rcpt;
- while (cur) {
- filt = cur->data;
- if (filt->after_connect) {
- filt->after_connect (&output, &log, curd->data);
- if (output != NULL) {
- if (!rspamd_dispatcher_write
- (task->dispatcher, output, strlen (output), FALSE, FALSE)){
- g_free (output);
- return;
- }
- g_free (output);
- }
- if (log != NULL) {
- msg_info ("%s", log);
- g_free (log);
- }
- if (curd->next) {
- curd = g_list_next (curd);
- }
- }
- cur = g_list_next (cur);
- }
-}
-
-static gboolean
-parse_line_custom (struct worker_task *task, f_str_t * in)
-{
- GList *cur, *curd;
- struct custom_filter *filt;
- gchar *output = NULL;
- gboolean res = TRUE;
- struct rspamd_worker_ctx *ctx = task->worker->ctx;
-
- cur = ctx->custom_filters;
- curd = task->rcpt;
- while (cur) {
- filt = cur->data;
- if (filt->after_connect) {
- if (!filt->process_line (in->begin, in->len, &output, curd->data)) {
- res = FALSE;
- }
- if (output != NULL) {
- if (!rspamd_dispatcher_write
- (task->dispatcher, output, strlen (output), FALSE, FALSE)) {
- g_free (output);
- return FALSE;
- }
- g_free (output);
- }
- if (curd->next) {
- curd = g_list_next (curd);
- }
- }
- cur = g_list_next (cur);
- }
-
- return res;
-}
-#else
-/* Stubs */
-static void
-fin_custom_filters (struct worker_task *task)
-{
-
-}
-
-static gboolean
-parse_line_custom (struct worker_task *task, f_str_t * in)
-{
- return FALSE;
-}
-#endif
-
/*
* Callback that is called when there is data to read in buffer
@@ -278,21 +171,12 @@ read_socket (f_str_t * in, void *arg)
switch (task->state) {
case READ_COMMAND:
case READ_HEADER:
- if (ctx->is_custom) {
- if (!parse_line_custom (task, in)) {
+ if (!read_rspamd_input_line (task, in)) {
+ if (!task->last_error) {
task->last_error = "Read error";
task->error_code = RSPAMD_NETWORK_ERROR;
- task->state = WRITE_ERROR;
- }
- }
- else {
- if (!read_rspamd_input_line (task, in)) {
- if (!task->last_error) {
- task->last_error = "Read error";
- task->error_code = RSPAMD_NETWORK_ERROR;
- }
- task->state = WRITE_ERROR;
}
+ task->state = WRITE_ERROR;
}
if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) {
return write_socket (task);
@@ -439,9 +323,6 @@ write_socket (void *arg)
if (!write_reply (task)) {
return FALSE;
}
- if (ctx->is_custom) {
- fin_custom_filters (task);
- }
destroy_session (task->s);
return FALSE;
break;
@@ -450,17 +331,11 @@ write_socket (void *arg)
if (!write_reply (task)) {
return FALSE;
}
- if (ctx->is_custom) {
- fin_custom_filters (task);
- }
destroy_session (task->s);
return FALSE;
break;
case CLOSING_CONNECTION:
debug_task ("normally closing connection");
- if (ctx->is_custom) {
- fin_custom_filters (task);
- }
destroy_session (task->s);
return FALSE;
break;
@@ -494,9 +369,6 @@ write_socket (void *arg)
break;
default:
msg_info ("abnormally closing connection at state: %d", task->state);
- if (ctx->is_custom) {
- fin_custom_filters (task);
- }
destroy_session (task->s);
return FALSE;
break;
@@ -517,9 +389,6 @@ err_socket (GError * err, void *arg)
msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message);
/* Free buffers */
- if (ctx->is_custom) {
- fin_custom_filters (task);
- }
g_error_free (err);
destroy_session (task->s);
}
@@ -633,8 +502,6 @@ accept_socket (gint fd, short what, void *arg)
struct rspamd_worker_ctx *ctx;
union sa_union su;
struct worker_task *new_task;
- GList *cur;
- struct custom_filter *filt;
socklen_t addrlen = sizeof (su.ss);
gint nfd;
@@ -692,123 +559,8 @@ accept_socket (gint fd, short what, void *arg)
/* Set up async session */
new_task->s =
new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task);
-
- /* Init custom filters */
-#ifndef BUILD_STATIC
- if (ctx->is_custom) {
- cur = ctx->custom_filters;
- while (cur) {
- filt = cur->data;
- if (filt->before_connect) {
- /* XXX: maybe not use rcpt list here for custom filters data, but this can save some bytes in task structure */
- new_task->rcpt =
- g_list_prepend (new_task->rcpt, filt->before_connect ());
- }
- cur = g_list_next (cur);
- }
- /* Keep user data in the same order as custom filters */
- new_task->rcpt = g_list_reverse (new_task->rcpt);
- }
-#endif
-}
-
-#ifndef BUILD_STATIC
-static gboolean
-load_custom_filter (struct config_file *cfg, const gchar * file,
- struct rspamd_worker_ctx *ctx)
-{
- struct custom_filter *filt;
- struct stat st;
-
- if (stat (file, &st) == -1 || !S_ISREG (st.st_mode)) {
- msg_info ("stat failed for %s", file);
- return FALSE;
- }
-
- filt = g_malloc (sizeof (struct custom_filter));
-
- filt->handle = g_module_open (file, G_MODULE_BIND_LAZY);
- if (!filt->handle) {
- msg_info ("module load failed: %s", g_module_error ());
- g_free (filt);
- return FALSE;
- }
-
- /* Now extract functions from custom module */
- if (!g_module_symbol
- (filt->handle, MODULE_INIT_FUNC, (gpointer *) & filt->init_func)
- || !g_module_symbol (filt->handle, MODULE_FINIT_FUNC,
- (gpointer *) & filt->fin_func)
- || !g_module_symbol (filt->handle, MODULE_BEFORE_CONNECT_FUNC,
- (gpointer *) & filt->before_connect)
- || !g_module_symbol (filt->handle, MODULE_AFTER_CONNECT_FUNC,
- (gpointer *) & filt->after_connect)
- || !g_module_symbol (filt->handle, MODULE_PARSE_LINE_FUNC,
- (gpointer *) & filt->process_line)) {
-
- msg_info ("cannot find handlers in module %s: %s", file,
- g_module_error ());
- g_free (filt);
- return FALSE;
- }
-
- filt->init_func (cfg);
- filt->filename = g_strdup (file);
- ctx->custom_filters = g_list_prepend (ctx->custom_filters, filt);
-
- return TRUE;
}
-/*
- * Load custom filters from specified path
- */
-static gboolean
-load_custom_filters (struct rspamd_worker *worker, const gchar * path)
-{
- glob_t gp;
- gint r, i;
- struct rspamd_worker_ctx *ctx = worker->ctx;
-
- gp.gl_offs = 0;
- if ((r = glob (path, GLOB_NOSORT, NULL, &gp)) != 0) {
- msg_warn ("glob failed: %s, %d", strerror (errno), r);
- return FALSE;
- }
-
- for (i = 0; i < (gint)gp.gl_pathc; i++) {
- if (!load_custom_filter (worker->srv->cfg, gp.gl_pathv[i], ctx)) {
- globfree (&gp);
- return FALSE;
- }
- }
-
- globfree (&gp);
-
- return TRUE;
-}
-
-static void
-unload_custom_filters (struct rspamd_worker_ctx *ctx)
-{
- GList *cur;
- struct custom_filter *filt;
-
- cur = ctx->custom_filters;
- while (cur) {
- filt = cur->data;
- if (filt->fin_func) {
- filt->fin_func ();
- }
- g_module_close (filt->handle);
- g_free (filt);
- cur = g_list_next (cur);
- }
-
- g_list_free (ctx->custom_filters);
-}
-
-#endif
-
gpointer
init_worker (struct config_file *cfg)
{
@@ -860,7 +612,6 @@ init_worker (struct config_file *cfg)
void
start_worker (struct rspamd_worker *worker)
{
- gchar *is_custom_str;
struct rspamd_worker_ctx *ctx = worker->ctx;
GError *err = NULL;
struct lua_locked_state *nL;
@@ -877,23 +628,8 @@ start_worker (struct rspamd_worker *worker)
event_base_set (ctx->ev_base, &worker->sig_ev_usr1);
signal_add (&worker->sig_ev_usr1, NULL);
+ start_map_watch (worker->srv->cfg, ctx->ev_base);
-#ifndef BUILD_STATIC
- /* Check if this worker is not usual rspamd worker, but uses custom filters from specified path */
- is_custom_str = g_hash_table_lookup (worker->cf->params, "custom_filters");
- if (is_custom_str && g_module_supported ()
- && load_custom_filters (worker, is_custom_str)) {
- msg_info ("starting custom process, loaded modules from %s",
- is_custom_str);
- ctx->is_custom = TRUE;
- }
- else {
-#endif
- /* Maps events */
- start_map_watch (worker->srv->cfg, ctx->ev_base);
-#ifndef BUILD_STATIC
- }
-#endif
ctx->resolver = dns_resolver_init (ctx->ev_base, worker->srv->cfg);
@@ -910,11 +646,6 @@ start_worker (struct rspamd_worker *worker)
event_base_loop (ctx->ev_base, 0);
-#ifndef BUILD_STATIC
- if (ctx->is_custom) {
- unload_custom_filters (ctx);
- }
-#endif
close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);