diff options
author | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-01-12 15:20:50 +0000 |
---|---|---|
committer | Vsevolod Stakhov <vsevolod@highsecure.ru> | 2014-01-12 15:20:50 +0000 |
commit | 67b932ad9786743dc032ff2adc3788c5aadd3933 (patch) | |
tree | 084872b519dc54beef83cbdade01af8bb639eafd /src/worker.c | |
parent | d6b454eda97f6ffe8455ec83ab8da6293e7b9c07 (diff) | |
download | rspamd-67b932ad9786743dc032ff2adc3788c5aadd3933.tar.gz rspamd-67b932ad9786743dc032ff2adc3788c5aadd3933.zip |
Start moving to HTTP world.
Diffstat (limited to 'src/worker.c')
-rw-r--r-- | src/worker.c | 107 |
1 files changed, 106 insertions, 1 deletions
diff --git a/src/worker.c b/src/worker.c index 3eb91a289..a36a0ed72 100644 --- a/src/worker.c +++ b/src/worker.c @@ -155,7 +155,7 @@ sigusr1_handler (gint fd, short what, void *arg) return; } - +# if 0 /* * Callback that is called when there is data to read in buffer */ @@ -389,6 +389,7 @@ err_socket (GError * err, void *arg) g_error_free (err); destroy_session (task->s); } +#endif /* * Called if all filters are processed @@ -489,6 +490,104 @@ reduce_tasks_count (gpointer arg) (*tasks) --; } +static gboolean +rspamd_worker_body_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg, + const gchar *chunk, gsize len) +{ + struct worker_task *task = (struct worker_task *) conn->ud; + struct rspamd_worker_ctx *ctx; + ssize_t r; + GError *err = NULL; + + ctx = task->worker->ctx; + + if (msg->body->len == 0) { + msg_err ("got zero length body, cannot continue"); + return FALSE; + } + + task->msg = msg->body; + + debug_task ("got string of length %z", task->msg->len); + + r = process_message (task); + if (r == -1) { + msg_warn ("processing of message failed"); + task->last_error = "MIME processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + return FALSE; + } + if (task->cmd == CMD_OTHER) { + /* Skip filters */ + task->state = WRITE_REPLY; + return FALSE; + } + else if (task->cmd == CMD_LEARN) { + if (!learn_task (task->statfile, task, &err)) { + task->last_error = memory_pool_strdup (task->task_pool, err->message); + task->error_code = err->code; + g_error_free (err); + task->state = WRITE_ERROR; + } + else { + task->last_error = "learn ok"; + task->error_code = 0; + task->state = WRITE_REPLY; + } + return FALSE; + } + else { + if (task->cfg->pre_filters == NULL) { + r = process_filters (task); + if (r == -1) { + task->last_error = "Filter processing error"; + task->error_code = RSPAMD_FILTER_ERROR; + task->state = WRITE_ERROR; + return FALSE; + } + /* Add task to classify to classify pool */ + if (!task->is_skipped && ctx->classify_pool) { + register_async_thread (task->s); + g_thread_pool_push (ctx->classify_pool, task, &err); + if (err != NULL) { + msg_err ("cannot pull task to the pool: %s", err->message); + remove_async_thread (task->s); + } + } + if (task->is_skipped) { + /* Call write_socket to write reply and exit */ + return TRUE; + } + } + else { + lua_call_pre_filters (task); + /* We want fin_task after pre filters are processed */ + task->s->wanna_die = TRUE; + task->state = WAIT_PRE_FILTER; + check_session_pending (task->s); + } + } + return TRUE; +} + +static void +rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err) +{ + struct worker_task *task = (struct worker_task *) conn->ud; + + msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message); + destroy_session (task->s); +} + +static void +rspamd_worker_finish_handler (struct rspamd_http_connection *conn, + struct rspamd_http_message *msg) +{ + +} + /* * Accept new connection and construct task */ @@ -550,11 +649,15 @@ accept_socket (gint fd, short what, void *arg) new_task->resolver = ctx->resolver; msec_to_tv (ctx->timeout, &ctx->io_tv); +#if 0 /* Set up dispatcher */ new_task->dispatcher = rspamd_create_dispatcher (ctx->ev_base, nfd, BUFFER_LINE, read_socket, write_socket, err_socket, &ctx->io_tv, (void *) new_task); new_task->dispatcher->peer_addr = new_task->client_addr.s_addr; +#endif + new_task->http_conn = rspamd_http_connection_new (rspamd_worker_body_handler, + rspamd_worker_error_handler, rspamd_worker_finish_handler, 0, RSPAMD_HTTP_SERVER); new_task->ev_base = ctx->ev_base; ctx->tasks ++; memory_pool_add_destructor (new_task->task_pool, (pool_destruct_func)reduce_tasks_count, &ctx->tasks); @@ -562,6 +665,8 @@ accept_socket (gint fd, short what, void *arg) /* Set up async session */ new_task->s = new_async_session (new_task->task_pool, fin_task, restore_task, free_task_hard, new_task); + + rspamd_http_connection_read_message (new_task->http_conn, new_task, nfd, &ctx->io_tv, ctx->ev_base); } gpointer |