aboutsummaryrefslogtreecommitdiffstats
path: root/src/worker.c
diff options
context:
space:
mode:
authorVsevolod Stakhov <vsevolod@highsecure.ru>2014-01-18 18:20:54 +0000
committerVsevolod Stakhov <vsevolod@highsecure.ru>2014-01-18 18:20:54 +0000
commite2f2eed337ecdb17c897fe7e04626dfffe32f2a2 (patch)
tree25f39a0aef9ed9eacc117407cc3824b16acccb9c /src/worker.c
parentf5933d697d2cd9854afcbc7421efda353e165aea (diff)
downloadrspamd-e2f2eed337ecdb17c897fe7e04626dfffe32f2a2.tar.gz
rspamd-e2f2eed337ecdb17c897fe7e04626dfffe32f2a2.zip
Parse HTTP requests, cleanup the code.
--HG-- extra : rebase_source : 6b35fbf55fc9fe65d7f033620670bb210928e9b4
Diffstat (limited to 'src/worker.c')
-rw-r--r--src/worker.c288
1 files changed, 22 insertions, 266 deletions
diff --git a/src/worker.c b/src/worker.c
index f2c579fdc..4199a809f 100644
--- a/src/worker.c
+++ b/src/worker.c
@@ -89,8 +89,6 @@ struct rspamd_worker_ctx {
struct event_base *ev_base;
};
-static gboolean write_socket (void *arg);
-
static sig_atomic_t wanna_die = 0;
#ifndef HAVE_SA_SIGINFO
@@ -155,242 +153,6 @@ sigusr1_handler (gint fd, short what, void *arg)
return;
}
-# if 0
-/*
- * Callback that is called when there is data to read in buffer
- */
-static gboolean
-read_socket (f_str_t * in, void *arg)
-{
- struct worker_task *task = (struct worker_task *) arg;
- struct rspamd_worker_ctx *ctx;
- ssize_t r;
- GError *err = NULL;
-
- ctx = task->worker->ctx;
- switch (task->state) {
- case READ_COMMAND:
- case READ_HEADER:
- if (!read_rspamd_input_line (task, in)) {
- if (!task->last_error) {
- task->last_error = "Read error";
- task->error_code = RSPAMD_NETWORK_ERROR;
- }
- task->state = WRITE_ERROR;
- }
- if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) {
- return write_socket (task);
- }
- break;
- case READ_MESSAGE:
- /* Allow half-closed connections to be proceed */
-
- debug_task ("got string of length %z", task->msg->len);
- if (task->content_length > 0) {
- task->msg->begin = in->begin;
- task->msg->len = in->len;
- task->state = WAIT_FILTER;
- task->dispatcher->want_read = FALSE;
- }
- else {
- task->dispatcher->want_read = FALSE;
- if (in->len > 0) {
- if (task->msg->begin == NULL) {
- /* Allocate buf */
- task->msg->size = MAX (BUFSIZ, in->len);
- task->msg->begin = g_malloc (task->msg->size);
- memcpy (task->msg->begin, in->begin, in->len);
- task->msg->len = in->len;
- }
- else if (task->msg->size >= task->msg->len + in->len) {
- memcpy (task->msg->begin + task->msg->len, in->begin, in->len);
- task->msg->len += in->len;
- }
- else {
- /* Need to realloc */
- task->msg->size = MAX (task->msg->size * 2, task->msg->size + in->len);
- task->msg->begin = g_realloc (task->msg->begin, task->msg->size);
- memcpy (task->msg->begin + task->msg->len, in->begin, in->len);
- task->msg->len += in->len;
- }
- /* Want more */
- return TRUE;
- }
- else if (task->msg->len > 0) {
- memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_free, task->msg->begin);
- }
- else {
- msg_warn ("empty message passed");
- task->last_error = "MIME processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- return write_socket (task);
- }
- }
-
- r = process_message (task);
- if (r == -1) {
- msg_warn ("processing of message failed");
- task->last_error = "MIME processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- return write_socket (task);
- }
- if (task->cmd == CMD_OTHER) {
- /* Skip filters */
- task->state = WRITE_REPLY;
- return write_socket (task);
- }
- else if (task->cmd == CMD_LEARN) {
- if (!learn_task (task->statfile, task, &err)) {
- task->last_error = memory_pool_strdup (task->task_pool, err->message);
- task->error_code = err->code;
- g_error_free (err);
- task->state = WRITE_ERROR;
- }
- else {
- task->last_error = "learn ok";
- task->error_code = 0;
- task->state = WRITE_REPLY;
- }
- return write_socket (task);
- }
- else {
- if (task->cfg->pre_filters == NULL) {
- r = process_filters (task);
- if (r == -1) {
- task->last_error = "Filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- return write_socket (task);
- }
- /* Add task to classify to classify pool */
- if (!task->is_skipped && ctx->classify_pool) {
- register_async_thread (task->s);
- g_thread_pool_push (ctx->classify_pool, task, &err);
- if (err != NULL) {
- msg_err ("cannot pull task to the pool: %s", err->message);
- remove_async_thread (task->s);
- }
- }
- if (task->is_skipped) {
- /* Call write_socket to write reply and exit */
- return write_socket (task);
- }
- }
- else {
- lua_call_pre_filters (task);
- /* We want fin_task after pre filters are processed */
- task->s->wanna_die = TRUE;
- task->state = WAIT_PRE_FILTER;
- check_session_pending (task->s);
- }
- }
- break;
- case WRITE_REPLY:
- case WRITE_ERROR:
- return write_socket (task);
- break;
- case WAIT_FILTER:
- case WAIT_POST_FILTER:
- case WAIT_PRE_FILTER:
- msg_info ("ignoring trailing garbadge of size %z", in->len);
- break;
- default:
- debug_task ("invalid state on reading stage");
- break;
- }
-
- return TRUE;
-}
-
-/*
- * Callback for socket writing
- */
-static gboolean
-write_socket (void *arg)
-{
- struct worker_task *task = (struct worker_task *) arg;
- struct rspamd_worker_ctx *ctx;
- GError *err = NULL;
- gint r;
-
- ctx = task->worker->ctx;
-
- switch (task->state) {
- case WRITE_REPLY:
- task->state = WRITING_REPLY;
- if (!write_reply (task)) {
- return FALSE;
- }
- destroy_session (task->s);
- return FALSE;
- break;
- case WRITE_ERROR:
- task->state = WRITING_REPLY;
- if (!write_reply (task)) {
- return FALSE;
- }
- destroy_session (task->s);
- return FALSE;
- break;
- case CLOSING_CONNECTION:
- debug_task ("normally closing connection");
- destroy_session (task->s);
- return FALSE;
- break;
- case WRITING_REPLY:
- case WAIT_FILTER:
- case WAIT_POST_FILTER:
- /* Do nothing here */
- break;
- case WAIT_PRE_FILTER:
- task->state = WAIT_FILTER;
- r = process_filters (task);
- if (r == -1) {
- task->last_error = "Filter processing error";
- task->error_code = RSPAMD_FILTER_ERROR;
- task->state = WRITE_ERROR;
- return write_socket (task);
- }
- /* Add task to classify to classify pool */
- if (!task->is_skipped && ctx->classify_pool) {
- register_async_thread (task->s);
- g_thread_pool_push (ctx->classify_pool, task, &err);
- if (err != NULL) {
- msg_err ("cannot pull task to the pool: %s", err->message);
- remove_async_thread (task->s);
- }
- }
- if (task->is_skipped) {
- /* Call write_socket again to write reply and exit */
- return write_socket (task);
- }
- break;
- default:
- msg_info ("abnormally closing connection at state: %d", task->state);
- destroy_session (task->s);
- return FALSE;
- break;
- }
- return TRUE;
-}
-
-/*
- * Called if something goes wrong
- */
-static void
-err_socket (GError * err, void *arg)
-{
- struct worker_task *task = (struct worker_task *) arg;
-
- msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message);
- /* Free buffers */
- g_error_free (err);
- destroy_session (task->s);
-}
-#endif
-
/*
* Called if all filters are processed
*/
@@ -410,7 +172,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
- write_reply (task);
+ rspamd_protocol_write_reply (task);
}
return TRUE;
}
@@ -443,7 +205,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
- write_reply (task);
+ rspamd_protocol_write_reply (task);
}
}
else {
@@ -455,7 +217,7 @@ fin_task (void *arg)
task->fin_callback (task->fin_arg);
}
else {
- write_reply (task);
+ rspamd_protocol_write_reply (task);
}
}
else {
@@ -465,7 +227,7 @@ fin_task (void *arg)
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
- write_reply (task);
+ rspamd_protocol_write_reply (task);
}
/* Add task to classify to classify pool */
if (!task->is_skipped && ctx->classify_pool) {
@@ -478,7 +240,7 @@ fin_task (void *arg)
}
}
if (task->is_skipped) {
- write_reply (task);
+ rspamd_protocol_write_reply (task);
}
}
}
@@ -510,7 +272,7 @@ reduce_tasks_count (gpointer arg)
(*tasks) --;
}
-static gboolean
+static gint
rspamd_worker_body_handler (struct rspamd_http_connection *conn,
struct rspamd_http_message *msg,
const gchar *chunk, gsize len)
@@ -524,7 +286,12 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
if (msg->body->len == 0) {
msg_err ("got zero length body, cannot continue");
- return FALSE;
+ return 0;
+ }
+
+ if (!rspamd_protocol_handle_request (task, msg)) {
+ task->state = WRITE_ERROR;
+ return 0;
}
task->msg = msg->body;
@@ -537,26 +304,12 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
task->last_error = "MIME processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
- return FALSE;
+ return 0;
}
if (task->cmd == CMD_OTHER) {
/* Skip filters */
task->state = WRITE_REPLY;
- return FALSE;
- }
- else if (task->cmd == CMD_LEARN) {
- if (!learn_task (task->statfile, task, &err)) {
- task->last_error = memory_pool_strdup (task->task_pool, err->message);
- task->error_code = err->code;
- g_error_free (err);
- task->state = WRITE_ERROR;
- }
- else {
- task->last_error = "learn ok";
- task->error_code = 0;
- task->state = WRITE_REPLY;
- }
- return FALSE;
+ return 0;
}
else {
if (task->cfg->pre_filters == NULL) {
@@ -565,7 +318,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
task->last_error = "Filter processing error";
task->error_code = RSPAMD_FILTER_ERROR;
task->state = WRITE_ERROR;
- return FALSE;
+ return 0;
}
/* Add task to classify to classify pool */
if (!task->is_skipped && ctx->classify_pool) {
@@ -578,7 +331,8 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
}
if (task->is_skipped) {
/* Call write_socket to write reply and exit */
- return TRUE;
+ task->state = WRITE_REPLY;
+ return 0;
}
}
else {
@@ -589,7 +343,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn,
check_session_pending (task->s);
}
}
- return TRUE;
+ return 0;
}
static void
@@ -611,6 +365,9 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn,
msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr));
destroy_session (task->s);
}
+ else {
+ check_session_pending (task->s);
+ }
}
/*
@@ -667,7 +424,6 @@ accept_socket (gint fd, short what, void *arg)
new_task->sock = nfd;
new_task->is_mime = ctx->is_mime;
new_task->is_json = ctx->is_json;
- new_task->is_http = ctx->is_http;
new_task->allow_learn = ctx->allow_learn;
worker->srv->stat->connections_count++;
@@ -779,7 +535,7 @@ start_worker (struct rspamd_worker *worker)
event_base_loop (ctx->ev_base, 0);
-
+ g_mime_shutdown ();
close_log (rspamd_main->logger);
exit (EXIT_SUCCESS);
}