From 7e4a21dc554d42d31afc611fd92aaddd7da1bb6c Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sun, 19 Jan 2014 00:27:54 +0000 Subject: [PATCH] Simplify states of task. Disable broken code. --- src/controller.c | 17 +---------------- src/filter.c | 10 +++------- src/http.c | 4 ++-- src/main.h | 8 ++------ src/protocol.c | 33 ++++++++++++++------------------- src/protocol.h | 13 +++++++------ src/webui.c | 13 ++++++++----- src/worker.c | 46 +++++++++++++++++++++++++++++++++++++++------- src/worker_util.c | 2 +- 9 files changed, 77 insertions(+), 69 deletions(-) diff --git a/src/controller.c b/src/controller.c index 097ed0e7c..269ca6db2 100644 --- a/src/controller.c +++ b/src/controller.c @@ -1282,19 +1282,7 @@ fin_learn_task (void *arg) { struct worker_task *task = (struct worker_task *) arg; - if (task->state != WRITING_REPLY) { - task->state = WRITE_REPLY; - } - - /* Check if we have all events finished */ - if (task->state != WRITING_REPLY) { - if (task->fin_callback) { - task->fin_callback (task->fin_arg); - } - else { - rspamd_dispatcher_restore (task->dispatcher); - } - } + /* XXX: needs to be reworked */ return TRUE; } @@ -1308,9 +1296,6 @@ restore_learn_task (void *arg) struct worker_task *task = (struct worker_task *) arg; /* Special state */ - task->state = WRITING_REPLY; - - rspamd_dispatcher_pause (task->dispatcher); } static gboolean diff --git a/src/filter.c b/src/filter.c index b1448f173..20b924cc0 100644 --- a/src/filter.c +++ b/src/filter.c @@ -271,9 +271,6 @@ process_filters (struct worker_task *task) task->is_skipped = TRUE; task->state = WRITE_REPLY; msg_info ("disable check for message id <%s>, user wants spam", task->message_id); - task->s->wanna_die = TRUE; - check_session_pending (task->s); - return 1; } @@ -286,15 +283,14 @@ process_filters (struct worker_task *task) if (!task->pass_all_filters && metric->actions[METRIC_ACTION_REJECT].score > 0 && check_metric_is_spam (task, metric)) { - task->s->wanna_die = TRUE; - check_session_pending (task->s); + task->state = WRITE_REPLY; return 1; } cur = g_list_next (cur); } } - task->s->wanna_die = TRUE; - check_session_pending (task->s); + + task->state = WAIT_FILTER; return 1; } diff --git a/src/http.c b/src/http.c index fff780e0f..22c202b88 100644 --- a/src/http.c +++ b/src/http.c @@ -712,8 +712,8 @@ rspamd_http_connection_write_message (struct rspamd_http_connection *conn, rspamd_snprintf (datebuf, sizeof (datebuf), "%s, %02d %s %4d %02d:%02d:%02d GMT", http_week[t.tm_wday], t.tm_mday, - http_month[t.tm_mon - 1], - t.tm_year, + http_month[t.tm_mon], + t.tm_year + 1900, t.tm_hour, t.tm_min, t.tm_sec); diff --git a/src/main.h b/src/main.h index ea1172468..3acd5a870 100644 --- a/src/main.h +++ b/src/main.h @@ -176,16 +176,12 @@ struct controller_session { struct worker_task { struct rspamd_worker *worker; /**< pointer to worker object */ enum { - READ_COMMAND, - READ_HEADER, READ_MESSAGE, - WRITE_REPLY, - WRITE_ERROR, WAIT_PRE_FILTER, WAIT_FILTER, WAIT_POST_FILTER, - CLOSING_CONNECTION, - WRITING_REPLY + WRITE_REPLY, + CLOSING_CONNECTION } state; /**< current session state */ enum rspamd_command cmd; /**< command */ struct custom_command *custom_cmd; /**< custom command if any */ diff --git a/src/protocol.c b/src/protocol.c index 3351a5195..23ed859ad 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -687,7 +687,7 @@ rspamd_gstring_append_double (double val, void *ud) return 0; } -static gboolean +static void write_check_reply (struct rspamd_http_message *msg, struct worker_task *task) { GString *logbuf; @@ -743,28 +743,23 @@ write_check_reply (struct rspamd_http_message *msg, struct worker_task *task) /* Increase counters */ task->worker->srv->stat->messages_scanned++; - - rspamd_http_connection_write_message (task->http_conn, msg, NULL, - "application/json", task, task->sock, &task->tv, task->ev_base); - return TRUE; } -gboolean +void rspamd_protocol_write_reply (struct worker_task *task) { struct rspamd_http_message *msg; + const gchar *ctype = "text/plain"; - rspamd_http_connection_reset (task->http_conn); msg = rspamd_http_new_message (HTTP_RESPONSE); msg->date = time (NULL); + task->state = CLOSING_CONNECTION; + debug_task ("writing reply to client"); if (task->error_code != 0) { msg->code = task->error_code; - rspamd_http_connection_write_message (task->http_conn, msg, NULL, - "text/plain", task, task->sock, &task->tv, task->ev_base); - task->state = CLOSING_CONNECTION; - return TRUE; + msg->body = g_string_new (task->last_error); } else { switch (task->cmd) { @@ -774,21 +769,21 @@ rspamd_protocol_write_reply (struct worker_task *task) case CMD_SYMBOLS: case CMD_PROCESS: case CMD_SKIP: - task->state = CLOSING_CONNECTION; - return write_check_reply (msg, task); + ctype = "application/json"; + write_check_reply (msg, task); break; case CMD_PING: - rspamd_http_connection_write_message (task->http_conn, msg, NULL, - "text/plain", task, task->sock, &task->tv, task->ev_base); - task->state = CLOSING_CONNECTION; + msg->body = g_string_new ("pong"); break; case CMD_OTHER: - task->state = CLOSING_CONNECTION; - return task->custom_cmd->func (task); + msg_err ("BROKEN"); + break; } } - return FALSE; + rspamd_http_connection_reset (task->http_conn); + rspamd_http_connection_write_message (task->http_conn, msg, NULL, + ctype, task, task->sock, &task->tv, task->ev_base); } void diff --git a/src/protocol.h b/src/protocol.h index 4b62fdfb2..30d3c51db 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -10,11 +10,12 @@ #include "filter.h" #include "http.h" -#define RSPAMD_FILTER_ERROR 1 -#define RSPAMD_NETWORK_ERROR 2 -#define RSPAMD_PROTOCOL_ERROR 3 -#define RSPAMD_LENGTH_ERROR 4 -#define RSPAMD_STATFILE_ERROR 5 +#define RSPAMD_BASE_ERROR 500 +#define RSPAMD_FILTER_ERROR RSPAMD_BASE_ERROR + 1 +#define RSPAMD_NETWORK_ERROR RSPAMD_BASE_ERROR + 2 +#define RSPAMD_PROTOCOL_ERROR RSPAMD_BASE_ERROR + 3 +#define RSPAMD_LENGTH_ERROR RSPAMD_BASE_ERROR + 4 +#define RSPAMD_STATFILE_ERROR RSPAMD_BASE_ERROR + 5 struct worker_task; struct metric; @@ -52,7 +53,7 @@ gboolean rspamd_protocol_handle_request (struct worker_task *task, struct rspamd * @param task task object * @return 0 if we wrote reply and -1 if there was some error */ -gboolean rspamd_protocol_write_reply (struct worker_task *task); +void rspamd_protocol_write_reply (struct worker_task *task); /** diff --git a/src/webui.c b/src/webui.c index 3553523f8..4472e5d35 100644 --- a/src/webui.c +++ b/src/webui.c @@ -406,7 +406,7 @@ http_scan_task_fin (gpointer arg) { struct scan_callback_data *cbdata = arg; static struct timeval tv = {.tv_sec = 0, .tv_usec = 0 }; - +#if 0 if (cbdata->task->state != WRITING_REPLY) { process_statfiles (cbdata->task); cbdata->task->state = WRITE_REPLY; @@ -426,7 +426,7 @@ http_scan_task_fin (gpointer arg) event_base_once (cbdata->ctx->ev_base, -1, EV_TIMEOUT, http_scan_task_event_helper, cbdata, &tv); } } - +#endif return TRUE; } @@ -439,7 +439,9 @@ http_scan_task_restore (gpointer arg) struct scan_callback_data *cbdata = arg; /* Special state */ +#if 0 cbdata->task->state = WRITING_REPLY; +#endif } /* Prepare callback data for scan */ @@ -565,7 +567,7 @@ http_learn_task_fin (gpointer arg) { struct learn_callback_data *cbdata = arg; static struct timeval tv = {.tv_sec = 0, .tv_usec = 0 }; - +#if 0 if (cbdata->task->state != WRITING_REPLY) { cbdata->task->state = WRITE_REPLY; } @@ -584,7 +586,7 @@ http_learn_task_fin (gpointer arg) event_base_once (cbdata->ctx->ev_base, -1, EV_TIMEOUT, http_learn_task_event_helper, cbdata, &tv); } } - +#endif return TRUE; } @@ -595,9 +597,10 @@ static void http_learn_task_restore (gpointer arg) { struct learn_callback_data *cbdata = arg; - +#if 0 /* Special state */ cbdata->task->state = WRITING_REPLY; +#endif } /* Prepare callback data for learn */ diff --git a/src/worker.c b/src/worker.c index 4199a809f..fc1d7a904 100644 --- a/src/worker.c +++ b/src/worker.c @@ -155,6 +155,7 @@ sigusr1_handler (gint fd, short what, void *arg) /* * Called if all filters are processed + * @return TRUE if session should be terminated */ static gboolean fin_task (void *arg) @@ -219,6 +220,7 @@ fin_task (void *arg) else { rspamd_protocol_write_reply (task); } + return TRUE; } else { task->state = WAIT_FILTER; @@ -226,8 +228,9 @@ fin_task (void *arg) if (r == -1) { task->last_error = "Filter processing error"; task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; + task->state = WRITE_REPLY; rspamd_protocol_write_reply (task); + return TRUE; } /* Add task to classify to classify pool */ if (!task->is_skipped && ctx->classify_pool) { @@ -242,6 +245,9 @@ fin_task (void *arg) if (task->is_skipped) { rspamd_protocol_write_reply (task); } + else { + return FALSE; + } } } @@ -257,7 +263,9 @@ restore_task (void *arg) struct worker_task *task = (struct worker_task *) arg; /* Call post filters */ - lua_call_post_filters (task); + if (task->state == WAIT_POST_FILTER) { + lua_call_post_filters (task); + } task->s->wanna_die = TRUE; } @@ -284,13 +292,18 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, ctx = task->worker->ctx; + if (task->cmd == CMD_PING) { + task->state = WRITE_REPLY; + return 0; + } + if (msg->body->len == 0) { msg_err ("got zero length body, cannot continue"); return 0; } if (!rspamd_protocol_handle_request (task, msg)) { - task->state = WRITE_ERROR; + task->state = WRITE_REPLY; return 0; } @@ -298,12 +311,15 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, debug_task ("got string of length %z", task->msg->len); + /* We got body, set wanna_die flag */ + task->s->wanna_die = TRUE; + r = process_message (task); if (r == -1) { msg_warn ("processing of message failed"); task->last_error = "MIME processing error"; task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; + task->state = WRITE_REPLY; return 0; } if (task->cmd == CMD_OTHER) { @@ -317,7 +333,7 @@ rspamd_worker_body_handler (struct rspamd_http_connection *conn, if (r == -1) { task->last_error = "Filter processing error"; task->error_code = RSPAMD_FILTER_ERROR; - task->state = WRITE_ERROR; + task->state = WRITE_REPLY; return 0; } /* Add task to classify to classify pool */ @@ -351,8 +367,19 @@ rspamd_worker_error_handler (struct rspamd_http_connection *conn, GError *err) { struct worker_task *task = (struct worker_task *) conn->ud; - msg_info ("abnormally closing connection from: %s, error: %s", inet_ntoa (task->client_addr), err->message); - destroy_session (task->s); + msg_info ("abnormally closing connection from: %s, error: %s", + inet_ntoa (task->client_addr), err->message); + if (task->state != CLOSING_CONNECTION) { + /* We still need to write a reply */ + task->error_code = err->code; + task->last_error = err->message; + task->state = WRITE_REPLY; + rspamd_protocol_write_reply (task); + } + else { + /* Terminate session immediately */ + destroy_session (task->s); + } } static void @@ -366,6 +393,11 @@ rspamd_worker_finish_handler (struct rspamd_http_connection *conn, destroy_session (task->s); } else { + /* + * If all filters have finished their tasks, this function will trigger + * writing a reply. + */ + task->s->wanna_die = TRUE; check_session_pending (task->s); } } diff --git a/src/worker_util.c b/src/worker_util.c index 3da6a26a5..c21de269e 100644 --- a/src/worker_util.c +++ b/src/worker_util.c @@ -51,7 +51,7 @@ construct_task (struct rspamd_worker *worker) new_task = g_slice_alloc0 (sizeof (struct worker_task)); new_task->worker = worker; - new_task->state = READ_COMMAND; + new_task->state = READ_MESSAGE; if (worker) { new_task->cfg = worker->srv->cfg; } -- 2.39.5