From e2f2eed337ecdb17c897fe7e04626dfffe32f2a2 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 18 Jan 2014 18:20:54 +0000 Subject: Parse HTTP requests, cleanup the code. --HG-- extra : rebase_source : 6b35fbf55fc9fe65d7f033620670bb210928e9b4 --- src/worker.c | 288 +++++------------------------------------------------------ 1 file changed, 22 insertions(+), 266 deletions(-) (limited to 'src/worker.c') diff --git a/src/worker.c b/src/worker.c index f2c579fdc..4199a809f 100644 --- a/src/worker.c +++ b/src/worker.c @@ -89,8 +89,6 @@ struct rspamd_worker_ctx { struct event_base *ev_base; }; -static gboolean write_socket (void *arg); - static sig_atomic_t wanna_die = 0; #ifndef HAVE_SA_SIGINFO @@ -155,242 +153,6 @@ sigusr1_handler (gint fd, short what, void *arg) return; } -# if 0 -/* - * Callback that is called when there is data to read in buffer - */ -static gboolean -read_socket (f_str_t * in, void *arg) -{ - struct worker_task *task = (struct worker_task *) arg; - struct rspamd_worker_ctx *ctx; - ssize_t r; - GError *err = NULL; - - ctx = task->worker->ctx; - switch (task->state) { - case READ_COMMAND: - case READ_HEADER: - if (!read_rspamd_input_line (task, in)) { - if (!task->last_error) { - task->last_error = "Read error"; - task->error_code = RSPAMD_NETWORK_ERROR; - } - task->state = WRITE_ERROR; - } - if (task->state == WRITE_REPLY || task->state == WRITE_ERROR) { - return write_socket (task); - } - break; - case READ_MESSAGE: - /* Allow half-closed connections to be proceed */ - - debug_task ("got string of length %z", task->msg->len); - if (task->content_length > 0) { - task->msg->begin = in->begin; - task->msg->len = in->len; - task->state = WAIT_FILTER; - task->dispatcher->want_read = FALSE; - } - else { - task->dispatcher->want_read = FALSE; - if (in->len > 0) { - if (task->msg->begin == NULL) { - /* Allocate buf */ - task->msg->size = MAX (BUFSIZ, in->len); - task->msg->begin = g_malloc (task->msg->size); - memcpy (task->msg->begin, in->begin, in->len); - task->msg->len = in->len; - } - else if (task->msg->size >= task->msg->len + in->len) { - memcpy (task->msg->begin + task->msg->len, in->begin, in->len); - task->msg->len += in->len; - } - else { - /* Need to realloc */ - task->msg->size = MAX (task->msg->size * 2, task->msg->size + in->len); - task->msg->begin = g_realloc (task->msg->begin, task->msg->size); - memcpy (task->msg->begin + task->msg->len, in->begin, in->len); - task->msg->len += in->len; - } - /* Want more */ - return TRUE; - } - else if (task->msg->len > 0) { - memory_pool_add_destructor (task->task_pool, (pool_destruct_func)g_free, task->msg->begin); - } - else { - msg_warn ("empty message passed"); - task->last_error = "MIME processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - return write_socket (task); - } - } - - r = process_message (task); - if (r == -1) { - msg_warn ("processing of message failed"); - task->last_error = "MIME processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - return write_socket (task); - } - if (task->cmd == CMD_OTHER) { - /* Skip filters */ - task->state = WRITE_REPLY; - return write_socket (task); - } - else if (task->cmd == CMD_LEARN) { - if (!learn_task (task->statfile, task, &err)) { - task->last_error = memory_pool_strdup (task->task_pool, err->message); - task->error_code = err->code; - g_error_free (err); - task->state = WRITE_ERROR; - } - else { - task->last_error = "learn ok"; - task->error_code = 0; - task->state = WRITE_REPLY; - } - return write_socket (task); - } - else { - if (task->cfg->pre_filters == NULL) { - r = process_filters (task); - if (r == -1) { - task->last_error = "Filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - return write_socket (task); - } - /* Add task to classify to classify pool */ - if (!task->is_skipped && ctx->classify_pool) { - register_async_thread (task->s); - g_thread_pool_push (ctx->classify_pool, task, &err); - if (err != NULL) { - msg_err ("cannot pull task to the pool: %s", err->message); - remove_async_thread (task->s); - } - } - if (task->is_skipped) { - /* Call write_socket to write reply and exit */ - return write_socket (task); - } - } - else { - lua_call_pre_filters (task); - /* We want fin_task after pre filters are processed */ - task->s->wanna_die = TRUE; - task->state = WAIT_PRE_FILTER; - check_session_pending (task->s); - } - } - break; - case WRITE_REPLY: - case WRITE_ERROR: - return write_socket (task); - break; - case WAIT_FILTER: - case WAIT_POST_FILTER: - case WAIT_PRE_FILTER: - msg_info ("ignoring trailing garbadge of size %z", in->len); - break; - default: - debug_task ("invalid state on reading stage"); - break; - } - - return TRUE; -} - -/* - * Callback for socket writing - */ -static gboolean -write_socket (void *arg) -{ - struct worker_task *task = (struct worker_task *) arg; - struct rspamd_worker_ctx *ctx; - GError *err = NULL; - gint r; - - ctx = task->worker->ctx; - - switch (task->state) { - case WRITE_REPLY: - task->state = WRITING_REPLY; - if (!write_reply (task)) { - return FALSE; - } - destroy_session (task->s); - return FALSE; - break; - case WRITE_ERROR: - task->state = WRITING_REPLY; - if (!write_reply (task)) { - return FALSE; - } - destroy_session (task->s); - return FALSE; - break; - case CLOSING_CONNECTION: - debug_task ("normally closing connection"); - destroy_session (task->s); - return FALSE; - break; - case WRITING_REPLY: - case WAIT_FILTER: - case WAIT_POST_FILTER: - /* Do nothing here */ - break; - case WAIT_PRE_FILTER: - task->state = WAIT_FILTER; - r = process_filters (task); - if (r == -1) { - task->last_error = "Filter processing error"; - task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; - return write_socket (task); - } - /* Add task to classify to classify pool */ - if (!task->is_skipped && ctx->classify_pool) { - register_async_thread (task->s); - g_thread_pool_push (ctx->classify_pool, task, &err); - if (err != NULL) { - msg_err ("cannot pull task to the pool: %s", err->message); - remove_async_thread (task->s); - } - } - if (task->is_skipped) { - /* Call write_socket again to write reply and exit */ - return write_socket (task); - } - break; - default: - msg_info ("abnormally closing connection at state: %d", task->state); - destroy_session (task->s); - return FALSE; - break; - } - return TRUE; -} - -/* - * Called if something goes wrong - */ -static void -err_socket (GError * err, void *arg) -{ - struct worker_task *task = (struct worker_task *) arg; - - msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message); - /* Free buffers */ - g_error_free (err); - destroy_session (task->s); -} -#endif - /* * Called if all filters are processed */ @@ -410,7 +172,7 @@ fin_task (void *arg) task->fin_callback (task->fin_arg); } else { - write_reply (task); + rspamd_protocol_write_reply (task); } return TRUE; } @@ -443,7 +205,7 @@ fin_task (void *arg) task->fin_callback (task->fin_arg); } else { - write_reply (task); + rspamd_protocol_write_reply (task); } } else { @@ -455,7 +217,7 @@ fin_task (void *arg) task->fin_callback (task->fin_arg); } else { - write_reply (task); + rspamd_protocol_write_reply (task); } } else { @@ -465,7 +227,7 @@ fin_task (void *arg) task->last_error = "Filter processing error"; task->error_code = RSPAMD_FILTER_ERROR; task->state = WRITE_ERROR; - write_reply (task); + rspamd_protocol_write_reply (task); } /* Add task to classify to classify pool */ if (!task->is_skipped && ctx->classify_pool) { @@ -478,7 +240,7 @@ fin_task (void *arg) } } if (task->is_skipped) { - write_reply (task); + rspamd_protocol_write_reply (task); } } } @@ -510,7 +272,7 @@ reduce_tasks_count (gpointer arg) (*tasks) --; } -static gboolean +static gint rspamd_worker_body_handler (struct rspamd_http_connection *conn, struct rspamd_http_message *msg, const gchar *chunk, gsize len) @@ -524,7 +286,12 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, if (msg->body->len == 0) { msg_err ("got zero length body, cannot continue"); - return FALSE; + return 0; + } + + if (!rspamd_protocol_handle_request (task, msg)) { + task->state = WRITE_ERROR; + return 0; } task->msg = msg->body; @@ -537,26 +304,12 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, task->last_error = "MIME processing error"; task->error_code = RSPAMD_FILTER_ERROR; task->state = WRITE_ERROR; - return FALSE; + return 0; } if (task->cmd == CMD_OTHER) { /* Skip filters */ task->state = WRITE_REPLY; - return FALSE; - } - else if (task->cmd == CMD_LEARN) { - if (!learn_task (task->statfile, task, &err)) { - task->last_error = memory_pool_strdup (task->task_pool, err->message); - task->error_code = err->code; - g_error_free (err); - task->state = WRITE_ERROR; - } - else { - task->last_error = "learn ok"; - task->error_code = 0; - task->state = WRITE_REPLY; - } - return FALSE; + return 0; } else { if (task->cfg->pre_filters == NULL) { @@ -565,7 +318,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, task->last_error = "Filter processing error"; task->error_code = RSPAMD_FILTER_ERROR; task->state = WRITE_ERROR; - return FALSE; + return 0; } /* Add task to classify to classify pool */ if (!task->is_skipped && ctx->classify_pool) { @@ -578,7 +331,8 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, } if (task->is_skipped) { /* Call write_socket to write reply and exit */ - return TRUE; + task->state = WRITE_REPLY; + return 0; } } else { @@ -589,7 +343,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, check_session_pending (task->s); } } - return TRUE; + return 0; } static void @@ -611,6 +365,9 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn, msg_debug ("normally closing connection from: %s", inet_ntoa (task->client_addr)); destroy_session (task->s); } + else { + check_session_pending (task->s); + } } /* @@ -667,7 +424,6 @@ accept_socket (gint fd, short what, void *arg) new_task->sock = nfd; new_task->is_mime = ctx->is_mime; new_task->is_json = ctx->is_json; - new_task->is_http = ctx->is_http; new_task->allow_learn = ctx->allow_learn; worker->srv->stat->connections_count++; @@ -779,7 +535,7 @@ start_worker (struct rspamd_worker *worker) event_base_loop (ctx->ev_base, 0); - + g_mime_shutdown (); close_log (rspamd_main->logger); exit (EXIT_SUCCESS); } -- cgit v1.2.3